Compare commits

..

3 Commits

Author SHA1 Message Date
nolash
b7fbf7614e Bump version 2021-04-25 14:20:16 +02:00
nolash
f550748efd Merge remote-tracking branch 'origin/master' into lash/get-registry-api 2021-04-25 14:17:37 +02:00
nolash
5aeadb6770 Add registry lookup tasks, replace direct rpc from admin api tx 2021-04-24 10:35:21 +02:00
276 changed files with 2860 additions and 13136 deletions

View File

@@ -6,7 +6,6 @@ include:
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
- local: 'apps/data-seeding/.gitlab-ci.yml'
stages:
- build

View File

@@ -6,4 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=

View File

@@ -6,4 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=

View File

@@ -1,28 +1,22 @@
# standard imports
import logging
import datetime
# external imports
# third-party imports
import moolb
# local imports
from cic_cache.db.list import (
list_transactions_mined,
list_transactions_account_mined,
list_transactions_mined_with_data,
)
from cic_cache.db import list_transactions_mined
from cic_cache.db import list_transactions_account_mined
logg = logging.getLogger()
class Cache:
class BloomCache:
def __init__(self, session):
self.session = session
class BloomCache(Cache):
@staticmethod
def __get_filter_size(n):
n = 8192 * 8
@@ -93,44 +87,3 @@ class BloomCache(Cache):
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
class DataCache(Cache):
def load_transactions_with_data(self, offset, end):
rows = list_transactions_mined_with_data(self.session, offset, end)
tx_cache = []
highest_block = -1;
lowest_block = -1;
date_is_str = None # stick this in startup
for r in rows:
if highest_block == -1:
highest_block = r['block_number']
lowest_block = r['block_number']
tx_type = 'unknown'
if r['value'] != None:
tx_type = '{}.{}'.format(r['domain'], r['value'])
if date_is_str == None:
date_is_str = type(r['date_block']).__name__ == 'str'
o = {
'block_number': r['block_number'],
'tx_hash': r['tx_hash'],
'date_block': r['date_block'],
'sender': r['sender'],
'recipient': r['recipient'],
'from_value': int(r['from_value']),
'to_value': int(r['to_value']),
'source_token': r['source_token'],
'destination_token': r['destination_token'],
'success': r['success'],
'tx_type': tx_type,
}
if date_is_str:
o['date_block'] = datetime.datetime.fromisoformat(r['date_block'])
tx_cache.append(o)
return (lowest_block, highest_block, tx_cache)

View File

@@ -2,14 +2,9 @@
import logging
# local imports
from .list import (
list_transactions_mined,
list_transactions_account_mined,
add_transaction,
tag_transaction,
add_tag,
)
from .list import list_transactions_mined
from .list import list_transactions_account_mined
from .list import add_transaction
logg = logging.getLogger()

View File

@@ -2,9 +2,8 @@
import logging
import datetime
# external imports
# third-party imports
from cic_cache.db.models.base import SessionBase
from sqlalchemy import text
logg = logging.getLogger()
@@ -28,26 +27,6 @@ def list_transactions_mined(
return r
def list_transactions_mined_with_data(
session,
offset,
end,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined(
session,
address,
@@ -71,8 +50,7 @@ def list_transactions_account_mined(
def add_transaction(
session,
tx_hash,
session, tx_hash,
block_number,
tx_index,
sender,
@@ -84,33 +62,6 @@ def add_transaction(
success,
timestamp,
):
"""Adds a single transaction to the cache persistent storage. Sensible interpretation of all fields is the responsibility of the caller.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param block_number: Block number
:type block_number: int
:param tx_index: Transaction index in block
:type tx_index: int
:param sender: Ethereum address of effective sender
:type sender: str, 0x-hex
:param receiver: Ethereum address of effective recipient
:type receiver: str, 0x-hex
:param source_token: Ethereum address of token used by sender
:type source_token: str, 0x-hex
:param destination_token: Ethereum address of token received by recipient
:type destination_token: str, 0x-hex
:param from_value: Source token value spent in transaction
:type from_value: int
:param to_value: Destination token value received in transaction
:type to_value: int
:param success: True if code execution on network was successful
:type success: bool
:param date_block: Block timestamp
:type date_block: datetime
"""
date_block = datetime.datetime.fromtimestamp(timestamp)
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
tx_hash,
@@ -126,74 +77,3 @@ def add_transaction(
date_block,
)
session.execute(s)
def tag_transaction(
session,
tx_hash,
name,
domain=None,
):
"""Tag a single transaction with a single tag.
Tag must already exist in storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises ValueError: Unknown tag or transaction hash
"""
s = text("SELECT id from tx where tx_hash = :a")
r = session.execute(s, {'a': tx_hash}).fetchall()
tx_id = r[0].values()[0]
if tx_id == None:
raise ValueError('unknown tx hash {}'.format(tx_hash))
#s = text("SELECT id from tag where value = :a and domain = :b")
if domain == None:
s = text("SELECT id from tag where value = :a")
else:
s = text("SELECT id from tag where value = :a and domain = :b")
r = session.execute(s, {'a': name, 'b': domain}).fetchall()
tag_id = r[0].values()[0]
logg.debug('type {} {}'.format(type(tag_id), type(tx_id)))
if tag_id == None:
raise ValueError('unknown tag name {} domain {}'.format(name, domain))
s = text("INSERT INTO tag_tx_link (tag_id, tx_id) VALUES (:a, :b)")
r = session.execute(s, {'a': int(tag_id), 'b': int(tx_id)})
def add_tag(
session,
name,
domain=None,
):
"""Add a single tag to storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises sqlalchemy.exc.IntegrityError: Tag already exists
"""
s = None
if domain == None:
s = text("INSERT INTO tag (value) VALUES (:b)")
else:
s = text("INSERT INTO tag (domain, value) VALUES (:a, :b)")
session.execute(s, {'a': domain, 'b': name})

View File

@@ -1,38 +0,0 @@
"""Transaction tags
Revision ID: aaf2bdce7d6e
Revises: 6604de4203e2
Create Date: 2021-05-01 09:20:20.775082
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'aaf2bdce7d6e'
down_revision = '6604de4203e2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tag',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('domain', sa.String(), nullable=True),
sa.Column('value', sa.String(), nullable=False),
)
op.create_index('idx_tag_domain_value', 'tag', ['domain', 'value'], unique=True)
op.create_table(
'tag_tx_link',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag_id', sa.Integer, sa.ForeignKey('tag.id'), nullable=False),
sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=False),
)
def downgrade():
op.drop_table('tag_tx_link')
op.drop_index('idx_tag_domain_value')
op.drop_table('tag')

View File

@@ -1,2 +1 @@
from .erc20 import *
from .faucet import *

View File

@@ -1,27 +1,2 @@
class TagSyncFilter:
"""Holds tag name and domain for an implementing filter.
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
"""
def __init__(self, name, domain=None):
self.tag_name = name
self.tag_domain = domain
def tag(self):
"""Return tag value/domain.
:rtype: Tuple
:returns: tag value/domain.
"""
return (self.tag_name, self.tag_domain)
def __str__(self):
if self.tag_domain == None:
return self.tag_name
return '{}.{}'.format(self.tag_domain, self.tag_name)
class SyncFilter:
pass

View File

@@ -2,6 +2,7 @@
import logging
# external imports
from chainlib.eth.erc20 import ERC20
from chainlib.eth.address import (
to_checksum_address,
)
@@ -12,19 +13,17 @@ from cic_eth_registry.error import (
NotAContractError,
ContractMismatchError,
)
from eth_erc20 import ERC20
# local imports
from .base import TagSyncFilter
from .base import SyncFilter
from cic_cache import db as cic_cache_db
logg = logging.getLogger().getChild(__name__)
class ERC20TransferFilter(TagSyncFilter):
class ERC20TransferFilter(SyncFilter):
def __init__(self, chain_spec):
super(ERC20TransferFilter, self).__init__('transfer', domain='erc20')
self.chain_spec = chain_spec
@@ -47,9 +46,6 @@ class ERC20TransferFilter(TagSyncFilter):
except RequestMismatchException:
logg.debug('erc20 match but not a transfer, skipping')
return False
except ValueError:
logg.debug('erc20 match but bogus data, skipping')
return False
token_sender = tx.outputs[0]
token_recipient = transfer_data[0]
@@ -71,13 +67,7 @@ class ERC20TransferFilter(TagSyncFilter):
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
#db_session.flush()
db_session.commit()
return True

View File

@@ -1,73 +0,0 @@
# standard imports
import logging
# external imports
from erc20_faucet import Faucet
from chainlib.eth.address import to_checksum_address
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status
from hexathon import strip_0x
# local imports
import cic_cache.db as cic_cache_db
from .base import TagSyncFilter
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
class FaucetFilter(TagSyncFilter):
def __init__(self, chain_spec, sender_address=ZERO_ADDRESS):
super(FaucetFilter, self).__init__('give_to', domain='faucet')
self.chain_spec = chain_spec
self.sender_address = sender_address
def filter(self, conn, block, tx, db_session=None):
try:
data = strip_0x(tx.payload)
except ValueError:
return False
logg.debug('data {}'.format(data))
if Faucet.method_for(data[:8]) == None:
return False
token_sender = tx.inputs[0]
token_recipient = data[64+8-40:]
logg.debug('token recipient {}'.format(token_recipient))
f = Faucet(self.chain_spec)
o = f.token(token_sender, sender_address=self.sender_address)
r = conn.do(o)
token = f.parse_token(r)
f = Faucet(self.chain_spec)
o = f.token_amount(token_sender, sender_address=self.sender_address)
r = conn.do(o)
token_value = f.parse_token_amount(r)
cic_cache_db.add_transaction(
db_session,
tx.hash,
block.number,
tx.index,
to_checksum_address(token_sender),
to_checksum_address(token_recipient),
token,
token,
token_value,
token_value,
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
db_session.commit()
return True

View File

@@ -1,110 +0,0 @@
# standard imports
import logging
import json
import re
import base64
# local imports
from cic_cache.cache import (
BloomCache,
DataCache,
)
logg = logging.getLogger(__name__)
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
re_transactions_all_data = r'/txa/(\d+)/(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_data(session, env):
r = re.match(re_transactions_all_data, env.get('PATH_INFO'))
if not r:
return None
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None
offset = r[1]
end = r[2]
if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude')
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end)
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()
o = {
'low': lowest_block,
'high': highest_block,
'data': tx_cache,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)

View File

@@ -1,20 +1,18 @@
# standard imports
import os
import re
import logging
import argparse
import json
import base64
# external imports
# third-party imports
import confini
# local imports
from cic_cache import BloomCache
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
from cic_cache.runnable.daemons.query import (
process_transactions_account_bloom,
process_transactions_all_bloom,
process_transactions_all_data,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -46,6 +44,72 @@ logg.debug('config:\n{}'.format(config))
dsn = dsn_from_config(config)
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
# uwsgi application
def application(env, start_response):
@@ -55,16 +119,10 @@ def application(env, start_response):
session = SessionBase.create_session()
for handler in [
process_transactions_all_data,
process_transactions_all_bloom,
process_transactions_account_bloom,
]:
r = None
try:
r = handler(session, env)
except ValueError as e:
start_response('400 {}'.format(str(e)))
return []
r = handler(session, env)
if r != None:
(mime_type, content) = r
break

View File

@@ -7,10 +7,9 @@ import argparse
import sys
import re
# external imports
# third-party imports
import confini
import celery
import sqlalchemy
import rlp
import cic_base.config
import cic_base.log
@@ -35,32 +34,19 @@ from chainsyncer.driver import (
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_cache.db import (
dsn_from_config,
add_tag,
)
from cic_cache.db import dsn_from_config
from cic_cache.runnable.daemons.filters import (
ERC20TransferFilter,
FaucetFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
def add_block_args(argparser):
argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
dsn = dsn_from_config(config)
@@ -69,21 +55,10 @@ SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def register_filter_tags(filters, session):
for f in filters:
tag = f.tag()
try:
add_tag(session, tag[0], domain=tag[1])
session.commit()
logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1]))
except sqlalchemy.exc.IntegrityError:
session.rollback()
logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1]))
def main():
# Connect to blockchain with chainlib
rpc = RPCConnection.connect(chain_spec, 'default')
@@ -92,7 +67,7 @@ def main():
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('current block height {}'.format(block_offset))
logg.debug('starting at block {}'.format(block_offset))
syncers = []
@@ -101,13 +76,8 @@ def main():
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_offset = block_offset
if config.get('_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
logg.info('found no backends to resume')
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
@@ -127,22 +97,11 @@ def main():
logg.info('using trusted address {}'.format(address))
erc20_transfer_filter = ERC20TransferFilter(chain_spec)
faucet_filter = FaucetFilter(chain_spec)
filters = [
erc20_transfer_filter,
faucet_filter,
]
session = SessionBase.create_session()
register_filter_tags(filters, session)
session.close()
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
for f in filters:
syncer.add_filter(f)
syncer.add_filter(erc20_transfer_filter)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
DEBUG=

View File

@@ -1,2 +1,2 @@
[eth]
provider = http://localhost:63545
provider = ws://localhost:63546

View File

@@ -1,3 +1,2 @@
[syncer]
loop_interval = 1
history_start = 0

View File

@@ -1,3 +1,2 @@
[syncer]
loop_interval = 5
history_start = 0

View File

@@ -1,4 +1,2 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=1
DEBUG=

View File

@@ -1,2 +0,0 @@
[syncer]
loop_interval = 1

View File

@@ -17,7 +17,7 @@ RUN apt-get update && \
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a76
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
@@ -43,6 +43,10 @@ COPY cic-cache/config/ /usr/local/etc/cic-cache/
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh

View File

@@ -1,12 +1,12 @@
cic-base~=0.1.2b10
cic-base~=0.1.2a77
alembic==1.4.2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.5a4
cic-eth-registry~=0.5.4a16
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.2a4
chainsyncer[sql]~=0.0.2a2

View File

@@ -4,8 +4,3 @@ pytest-mock==3.3.1
pysqlite3==0.4.3
sqlparse==0.4.1
pytest-celery==0.0.0a1
eth_tester==0.5.0b3
py-evm==0.3.0a20
web3==5.12.2
cic-eth-registry~=0.5.5a3
cic-base[full]==0.1.2b8

View File

@@ -3,7 +3,7 @@ import os
import sys
import datetime
# external imports
# third-party imports
import pytest
# local imports
@@ -84,20 +84,3 @@ def txs(
session.commit()
return [
tx_hash_first,
tx_hash_second,
]
@pytest.fixture(scope='function')
def tag_txs(
init_database,
txs,
):
db.add_tag(init_database, 'taag', domain='test')
init_database.commit()
db.tag_transaction(init_database, txs[1], 'taag', domain='test')

View File

@@ -1,3 +0,0 @@
from chainlib.eth.pytest import *
from cic_eth_registry.pytest.fixtures_tokens import *

View File

@@ -1,69 +0,0 @@
# standard imports
import os
import datetime
import logging
import json
# external imports
import pytest
from sqlalchemy import text
from chainlib.eth.tx import Tx
from chainlib.eth.block import Block
from chainlib.chain import ChainSpec
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter
logg = logging.getLogger()
def test_erc20_filter(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
r = fltr.filter(eth_rpc, block, tx, db_session=init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash

View File

@@ -1,71 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import (
block_by_hash,
Block,
)
from chainlib.eth.tx import (
receipt,
unpack,
transaction,
Tx,
)
from hexathon import strip_0x
from erc20_faucet.faucet import SingleShotFaucet
from sqlalchemy import text
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.faucet import FaucetFilter
logg = logging.getLogger()
def test_filter_faucet(
eth_rpc,
eth_signer,
foo_token,
faucet_noregistry,
init_database,
list_defaults,
contract_roles,
agent_roles,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = FaucetFilter(chain_spec, contract_roles['CONTRACT_DEPLOYER'])
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = SingleShotFaucet(chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.give_to(faucet_noregistry, agent_roles['ALICE'], agent_roles['ALICE'])
r = eth_rpc.do(o)
tx_src = unpack(bytes.fromhex(strip_0x(o['params'][0])), chain_spec)
o = receipt(r)
r = eth_rpc.do(o)
rcpt = Tx.src_normalize(r)
assert r['status'] == 1
o = block_by_hash(r['block_hash'])
r = eth_rpc.do(o)
block_object = Block(r)
tx = Tx(tx_src, block_object)
tx.apply_receipt(rcpt)
r = fltr.filter(eth_rpc, block_object, tx, init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash

View File

@@ -2,7 +2,7 @@
import os
import logging
# external imports
# third-party imports
import pytest
import confini
@@ -13,7 +13,7 @@ logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, 'config/test')
config_dir = os.path.join(root_dir, '.config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))

View File

@@ -3,16 +3,13 @@ import os
import logging
import re
# external imports
# third-party imports
import pytest
import sqlparse
import alembic
from alembic.config import Config as AlembicConfig
# local imports
from cic_cache.db.models.base import SessionBase
from cic_cache.db import dsn_from_config
from cic_cache.db import add_tag
logg = logging.getLogger(__file__)
@@ -29,10 +26,11 @@ def database_engine(
except FileNotFoundError:
pass
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn)
return dsn
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
@pytest.fixture(scope='function')
def init_database(
load_config,
@@ -40,23 +38,52 @@ def init_database(
):
rootdir = os.path.dirname(os.path.dirname(__file__))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrationsdir):
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
if load_config.get('DATABASE_ENGINE') == 'sqlite':
rconn = SessionBase.engine.raw_connection()
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
rconn.executescript(s)
else:
rconn = SessionBase.engine.raw_connection()
rcursor = rconn.cursor()
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
r = re.compile(r'^[A-Z]', re.MULTILINE)
for l in sqlparse.parse(s):
strl = str(l)
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
if not re.search(r, strl):
logg.warning('skipping parsed query line {}'.format(strl))
continue
rcursor.execute(strl)
rconn.commit()
rcursor.execute('SET search_path TO public')
# this doesn't work when run separately, no idea why
# functions have been manually added to original schema from cic-eth
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
#
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
rcursor.close()
session = SessionBase.create_session()
ac = AlembicConfig(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', database_engine)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
session.commit()
yield session
session.commit()
session.close()
@@ -89,14 +116,3 @@ def list_defaults(
return {
'block': 420000,
}
@pytest.fixture(scope='function')
def tags(
init_database,
):
add_tag(init_database, 'foo')
add_tag(init_database, 'baz', domain='bar')
add_tag(init_database, 'xyzzy', domain='bar')
init_database.commit()

View File

@@ -1,31 +0,0 @@
# standard imports
import json
# external imports
import pytest
# local imports
from cic_cache.runnable.daemons.query import process_transactions_all_data
def test_api_all_data(
init_database,
txs,
):
env = {
'PATH_INFO': '/txa/410000/420000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
j = process_transactions_all_data(init_database, env)
o = json.loads(j[1])
assert len(o['data']) == 2
env = {
'PATH_INFO': '/txa/420000/410000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
with pytest.raises(ValueError):
j = process_transactions_all_data(init_database, env)

View File

@@ -4,12 +4,11 @@ import datetime
import logging
import json
# external imports
# third-party imports
import pytest
# local imports
from cic_cache import BloomCache
from cic_cache.cache import DataCache
logg = logging.getLogger()
@@ -34,23 +33,3 @@ def test_cache(
assert b[0] == list_defaults['block'] - 1
def test_cache_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tag_txs,
):
session = init_database
c = DataCache(session)
b = c.load_transactions_with_data(410000, 420000)
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == txs[1]
assert b[2][1]['tx_type'] == 'unknown'
assert b[2][0]['tx_type'] == 'test.taag'

View File

@@ -1,37 +0,0 @@
import os
import datetime
import logging
import json
# external imports
import pytest
# local imports
from cic_cache.db import tag_transaction
logg = logging.getLogger()
def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tags,
):
tag_transaction(init_database, txs[0], 'foo')
tag_transaction(init_database, txs[0], 'baz', domain='bar')
tag_transaction(init_database, txs[1], 'xyzzy', domain='bar')
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.value = 'foo'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'baz'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'xyzzy'").fetchall()
assert r[0][0] == txs[1]

View File

@@ -5,5 +5,3 @@ omit =
cic_eth/db/migrations/*
cic_eth/sync/head.py
cic_eth/sync/mempool.py
cic_eth/queue/state.py
*redis*.py

View File

@@ -5,29 +5,18 @@
.cic_eth_changes_target:
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
#changes:
#- $CONTEXT/$APP_NAME/**/*
when: always
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-eth:
extends:
- .cic_eth_variables
- .cic_eth_changes_target
- .py_build_target_test
test-mr-cic-eth:
extends:
- .py_build_merge_request
- .cic_eth_variables
- .cic_eth_changes_target
stage: test
image: $CI_REGISTRY_IMAGE/$APP_NAME-test:latest
script:
- cd apps/$APP_NAME/
- pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-eth"]
build-push-cic-eth:
extends:
- .py_build_push
- .cic_eth_variables

View File

@@ -4,18 +4,11 @@ import logging
# external imports
import celery
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.tx import (
unpack,
TxFactory,
)
from chainlib.eth.gas import OverrideGasOracle
from chainqueue.sql.query import get_tx
from chainqueue.sql.state import set_cancel
from chainlib.eth.tx import unpack
from chainqueue.query import get_tx
from chainqueue.state import set_cancel
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
from hexathon import strip_0x
from potaahto.symbols import snake_and_camel
# local imports
from cic_eth.db.models.base import SessionBase
@@ -28,14 +21,13 @@ from cic_eth.admin.ctrl import (
)
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.task import BaseTask
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(bind=True, base=BaseTask)
def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
@celery_app.task(bind=True)
def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
"""Shift all transactions with nonces higher than the offset by the provided position delta.
Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN.
@@ -46,29 +38,25 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
:type tx_hash_orig_hex: str, 0x-hex
:param delta: Amount
"""
chain_spec = ChainSpec.from_dict(chainspec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
queue = None
try:
queue = self.request.delivery_info.get('routing_key')
except AttributeError:
pass
session = BaseTask.session_func()
tx_brief = get_tx(chain_spec, tx_hash_orig_hex, session=session)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx']))
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_brief = get_tx(tx_hash_orig_hex)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx'][2:]))
tx = unpack(tx_raw, chain_spec)
nonce = tx_brief['nonce']
address = tx['from']
logg.debug('shifting nonce {} position(s) for address {}, offset {}, hash {}'.format(delta, address, nonce, tx['hash']))
logg.debug('shifting nonce {} position(s) for address {}, offset {}'.format(delta, address, nonce))
lock_queue(None, chain_spec.asdict(), address=address)
lock_send(None, chain_spec.asdict(), address=address)
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
lock_queue(None, chain_str, address)
lock_send(None, chain_str, address)
session = SessionBase.create_session()
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==address)
@@ -81,57 +69,49 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
for otx in otxs:
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
tx_new = snake_and_camel(tx_new)
tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce']
tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta
logg.debug('tx_new {}'.format(tx_new))
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
del(tx_new['hashUnsigned'])
tx_new['nonce'] -= delta
gas_oracle = OverrideGasOracle(limit=tx_new['gas'], price=tx_new['gas_price'] + 1) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx_new)
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_new, chain_str)
logg.debug('tx {} -> {} nonce {} -> {}'.format(tx_previous_hash_hex, tx_hash_hex, tx_previous_nonce, tx_new['nonce']))
otx = Otx(
tx_new['nonce'],
tx_hash_hex,
tx_signed_raw_hex,
)
nonce=tx_new['nonce'],
address=tx_new['from'],
tx_hash=tx_hash_hex,
signed_tx=tx_signed_raw_hex,
)
session.add(otx)
session.commit()
# TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces.
set_cancel(chain_spec, strip_0x(tx_previous_hash_hex), manual=True, session=session)
set_cancel(tx_previous_hash_hex, True)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex, session=session)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex)
tx_hashes.append(tx_hash_hex)
txs.append(tx_signed_raw_hex)
session.commit()
session.close()
s = create_check_gas_task(
s = create_check_gas_and_send_task(
txs,
chain_spec,
chain_str,
tx_new['from'],
gas=tx_new['gas'],
tx_hashes_hex=tx_hashes,
queue=queue,
tx_new['gas'],
tx_hashes,
queue,
)
s_unlock_send = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
chain_spec.asdict(),
chain_str,
tx_new['from'],
],
queue=queue,
@@ -139,7 +119,7 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
s_unlock_direct = celery.signature(
'cic_eth.admin.ctrl.unlock_queue',
[
chain_spec.asdict(),
chain_str,
tx_new['from'],
],
queue=queue,

View File

@@ -16,6 +16,4 @@ def default_token(self):
return {
'symbol': self.default_token_symbol,
'address': self.default_token_address,
'name': self.default_token_name,
'decimals': self.default_token_decimals,
}

View File

@@ -8,7 +8,6 @@ from chainlib.eth.constant import (
ZERO_ADDRESS,
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import UnknownContractError
from chainlib.eth.address import to_checksum_address
from chainlib.eth.contract import code
@@ -31,14 +30,13 @@ from chainqueue.db.enum import (
status_str,
)
from chainqueue.error import TxStateChangeError
from chainqueue.sql.query import get_tx
from eth_erc20 import ERC20
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
from cic_eth.error import InitializationError
from cic_eth.queue.query import get_tx
app = celery.current_app
@@ -190,7 +188,6 @@ class AdminApi:
s_manual = celery.signature(
'cic_eth.queue.state.set_manual',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue,
@@ -209,9 +206,8 @@ class AdminApi:
s.link(s_gas)
return s_manual.apply_async()
def check_nonce(self, chain_spec, address):
def check_nonce(self, address):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
[
@@ -232,12 +228,13 @@ class AdminApi:
s_get_tx = celery.signature(
'cic_eth.queue.query.get_tx',
[
chain_spec.asdict(),
chain_spec.asdict(),
k,
],
queue=self.queue,
)
tx = s_get_tx.apply_async().get()
#tx = get_tx(k)
logg.debug('checking nonce {} (previous {})'.format(tx['nonce'], last_nonce))
nonce_otx = tx['nonce']
if not is_alive(tx['status']) and tx['status'] & local_fail > 0:
@@ -245,9 +242,7 @@ class AdminApi:
blocking_tx = k
blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1:
logg.debug('tx {}'.format(tx))
tx_obj = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec)
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx_obj['from']))
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from']))
blocking_tx = k
blocking_nonce = nonce_otx
break
@@ -261,13 +256,12 @@ class AdminApi:
'blocking': blocking_nonce,
},
'tx': {
'blocking': add_0x(blocking_tx),
'blocking': blocking_tx,
}
}
}
# TODO: is risky since it does not validate that there is actually a nonce problem?
def fix_nonce(self, chain_spec, address, nonce):
def fix_nonce(self, address, nonce, chain_spec):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
[
@@ -281,17 +275,15 @@ class AdminApi:
txs = s.apply_async().get()
tx_hash_hex = None
session = SessionBase.create_session()
for k in txs.keys():
tx_dict = get_tx(chain_spec, k, session=session)
tx_dict = get_tx(k)
if tx_dict['nonce'] == nonce:
tx_hash_hex = k
session.close()
s_nonce = celery.signature(
'cic_eth.admin.nonce.shift_nonce',
[
chain_spec.asdict(),
self.rpc.chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue
@@ -396,13 +388,12 @@ class AdminApi:
t = s.apply_async()
tx = t.get()
source_token = None
if tx['source_token'] != ZERO_ADDRESS:
source_token_declaration = None
if registry != None:
try:
source_token_declaration = registry.by_address(tx['source_token'], sender_address=self.call_address)
source_token = registry.by_address(tx['source_token'])
except UnknownContractError:
logg.warning('unknown source token contract {} (direct)'.format(tx['source_token']))
else:
@@ -415,21 +406,16 @@ class AdminApi:
queue=self.queue
)
t = s.apply_async()
source_token_declaration = t.get()
if source_token_declaration != None:
logg.warning('found declarator record for source token {} but not checking validity'.format(tx['source_token']))
source_token = ERC20Token(chain_spec, self.rpc, tx['source_token'])
logg.debug('source token set tup {}'.format(source_token))
source_token = t.get()
if source_token == None:
logg.warning('unknown source token contract {} (task pool)'.format(tx['source_token']))
destination_token = None
if tx['destination_token'] != ZERO_ADDRESS:
destination_token_declaration = None
if registry != None:
try:
destination_token_declaration = registry.by_address(tx['destination_token'], sender_address=self.call_address)
destination_token = registry.by_address(tx['destination_token'])
except UnknownContractError:
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
else:
@@ -442,10 +428,10 @@ class AdminApi:
queue=self.queue
)
t = s.apply_async()
destination_token_declaration = t.get()
if destination_token_declaration != None:
logg.warning('found declarator record for destination token {} but not checking validity'.format(tx['destination_token']))
destination_token = ERC20Token(chain_spec, self.rpc, tx['destination_token'])
destination_token = t.get()
if destination_token == None:
logg.warning('unknown destination token contract {} (task pool)'.format(tx['destination_token']))
tx['sender_description'] = 'Custodial account'
tx['recipient_description'] = 'Custodial account'
@@ -557,19 +543,13 @@ class AdminApi:
if role != None:
tx['recipient_description'] = role
erc20_c = ERC20(chain_spec)
if source_token != None:
tx['source_token_symbol'] = source_token.symbol
o = erc20_c.balance_of(tx['source_token'], tx['sender'], sender_address=self.call_address)
r = self.rpc.do(o)
tx['sender_token_balance'] = erc20_c.parse_balance_of(r)
tx['source_token_symbol'] = source_token.symbol()
tx['sender_token_balance'] = source_token.function('balanceOf')(tx['sender']).call()
if destination_token != None:
tx['destination_token_symbol'] = destination_token.symbol
o = erc20_c.balance_of(tx['destination_token'], tx['recipient'], sender_address=self.call_address)
r = self.rpc.do(o)
tx['recipient_token_balance'] = erc20_c.parse_balance_of(r)
#tx['recipient_token_balance'] = destination_token.function('balanceOf')(tx['recipient']).call()
tx['destination_token_symbol'] = destination_token.symbol()
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
# TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
tx['network_status'] = 'Not in node'

View File

@@ -74,134 +74,134 @@ class Api:
return s_token.apply_async()
# def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
# """Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
#
# :param from_address: Ethereum address of sender
# :type from_address: str, 0x-hex
# :param to_address: Ethereum address of receipient
# :type to_address: str, 0x-hex
# :param target_return: Estimated return from conversion
# :type target_return: int
# :param minimum_return: The least value of destination token return to allow
# :type minimum_return: int
# :param from_token_symbol: ERC20 token symbol of token being converted
# :type from_token_symbol: str
# :param to_token_symbol: ERC20 token symbol of token to receive
# :type to_token_symbol: str
# :returns: uuid of root task
# :rtype: celery.Task
# """
# raise NotImplementedError('out of service until new DEX migration is done')
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# [from_token_symbol, to_token_symbol],
# self.chain_spec.asdict(),
# LockEnum.QUEUE,
# from_address,
# ],
# queue=self.queue,
# )
# s_nonce = celery.signature(
# 'cic_eth.eth.nonce.reserve_nonce',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_tokens = celery.signature(
# 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
# [
# self.chain_str,
# ],
# queue=self.queue,
# )
# s_convert = celery.signature(
# 'cic_eth.eth.bancor.convert_with_default_reserve',
# [
# from_address,
# target_return,
# minimum_return,
# to_address,
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_nonce.link(s_tokens)
# s_check.link(s_nonce)
# if self.callback_param != None:
# s_convert.link(self.callback_success)
# s_tokens.link(s_convert).on_error(self.callback_error)
# else:
# s_tokens.link(s_convert)
#
# t = s_check.apply_async(queue=self.queue)
# return t
#
#
# def convert(self, from_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
# """Executes a chain of celery tasks that performs conversion between two ERC20 tokens.
#
# :param from_address: Ethereum address of sender
# :type from_address: str, 0x-hex
# :param target_return: Estimated return from conversion
# :type target_return: int
# :param minimum_return: The least value of destination token return to allow
# :type minimum_return: int
# :param from_token_symbol: ERC20 token symbol of token being converted
# :type from_token_symbol: str
# :param to_token_symbol: ERC20 token symbol of token to receive
# :type to_token_symbol: str
# :returns: uuid of root task
# :rtype: celery.Task
# """
# raise NotImplementedError('out of service until new DEX migration is done')
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# [from_token_symbol, to_token_symbol],
# self.chain_spec.asdict(),
# LockEnum.QUEUE,
# from_address,
# ],
# queue=self.queue,
# )
# s_nonce = celery.signature(
# 'cic_eth.eth.nonce.reserve_nonce',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_tokens = celery.signature(
# 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_convert = celery.signature(
# 'cic_eth.eth.bancor.convert_with_default_reserve',
# [
# from_address,
# target_return,
# minimum_return,
# from_address,
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_nonce.link(s_tokens)
# s_check.link(s_nonce)
# if self.callback_param != None:
# s_convert.link(self.callback_success)
# s_tokens.link(s_convert).on_error(self.callback_error)
# else:
# s_tokens.link(s_convert)
#
# t = s_check.apply_async(queue=self.queue)
# return t
def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of receipient
:type to_address: str, 0x-hex
:param target_return: Estimated return from conversion
:type target_return: int
:param minimum_return: The least value of destination token return to allow
:type minimum_return: int
:param from_token_symbol: ERC20 token symbol of token being converted
:type from_token_symbol: str
:param to_token_symbol: ERC20 token symbol of token to receive
:type to_token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_convert = celery.signature(
'cic_eth.eth.bancor.convert_with_default_reserve',
[
from_address,
target_return,
minimum_return,
to_address,
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
else:
s_tokens.link(s_convert)
t = s_check.apply_async(queue=self.queue)
return t
def convert(self, from_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param target_return: Estimated return from conversion
:type target_return: int
:param minimum_return: The least value of destination token return to allow
:type minimum_return: int
:param from_token_symbol: ERC20 token symbol of token being converted
:type from_token_symbol: str
:param to_token_symbol: ERC20 token symbol of token to receive
:type to_token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_convert = celery.signature(
'cic_eth.eth.bancor.convert_with_default_reserve',
[
from_address,
target_return,
minimum_return,
from_address,
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
else:
s_tokens.link(s_convert)
t = s_check.apply_async(queue=self.queue)
return t
def transfer(self, from_address, to_address, value, token_symbol):

View File

@@ -0,0 +1,8 @@
import math
def num_serialize(n):
if n == 0:
return b'\x00'
binlog = math.log2(n)
bytelength = int(binlog / 8 + 1)
return n.to_bytes(bytelength, 'big')

View File

@@ -3,7 +3,7 @@ import logging
# external imports
import celery
from erc20_faucet import Faucet
from erc20_single_shot_faucet import SingleShotFaucet as Faucet
from hexathon import (
strip_0x,
)
@@ -20,9 +20,8 @@ from chainlib.eth.tx import (
)
from chainlib.chain import ChainSpec
from chainlib.error import JSONRPCException
from eth_accounts_index.registry import AccountRegistry
from eth_accounts_index import AccountsIndex
from sarafu_faucet import MinterFaucet
from eth_accounts_index import AccountRegistry
from sarafu_faucet import MinterFaucet as Faucet
from chainqueue.db.models.tx import TxCache
# local import
@@ -134,7 +133,7 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas)
account_registry = AccountsIndex(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
account_registry = AccountRegistry(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
@@ -186,7 +185,7 @@ def gift(self, account_address, chain_spec_dict):
# Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(account_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, MinterFaucet.gas)
gas_oracle = self.create_gas_oracle(rpc, Faucet.gas)
faucet = Faucet(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = faucet.give_to(faucet_address, account_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
@@ -339,7 +338,7 @@ def cache_account_data(
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = AccountsIndex.parse_add_request(tx['data'])
tx_data = AccountRegistry.parse_add_request(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache(

View File

@@ -6,6 +6,7 @@ import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import (
TxFormat,
unpack,
@@ -15,7 +16,6 @@ from cic_eth_registry.erc20 import ERC20Token
from hexathon import strip_0x
from chainqueue.db.models.tx import TxCache
from chainqueue.error import NotLocalTxError
from eth_erc20 import ERC20
# local imports
from cic_eth.db.models.base import SessionBase

View File

@@ -57,12 +57,10 @@ celery_app = celery.current_app
logg = logging.getLogger()
MAXIMUM_FEE_UNITS = 8000000
class MaxGasOracle:
def gas(code=None):
return MAXIMUM_FEE_UNITS
return 8000000
def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
@@ -152,7 +150,7 @@ def cache_gas_data(
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=MAXIMUM_FEE_UNITS):
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
@@ -172,30 +170,24 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
logg.debug('txs {} tx_hashes {}'.format(txs, tx_hashes))
addresspass = None
if len(txs) == 0:
addresspass = []
for i in range(len(tx_hashes)):
o = get_tx(chain_spec_dict, tx_hashes[i])
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
logg.debug('sender {}'.format(o))
tx = unpack(bytes.fromhex(strip_0x(o['signed_tx'])), chain_spec)
if address == None:
address = tx['from']
elif address != tx['from']:
raise ValueError('txs passed to check gas must all have same sender; had {} got {}'.format(address, tx['from']))
addresspass.append(address)
address = o['address']
#if not web3.Web3.isChecksumAddress(address):
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
conn = RPCConnection.connect(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
gas_balance = 0
try:
o = balance(address)
@@ -206,9 +198,6 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
conn.disconnect()
raise EthError('gas_balance call for {}: {}'.format(address, e))
if gas_required == None:
gas_required = MAXIMUM_FEE_UNITS
logg.debug('address {} has gas {} needs {}'.format(address, gas_balance, gas_required))
session = SessionBase.create_session()
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
@@ -279,8 +268,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
queue=queue,
)
ready_tasks.append(s)
t = celery.group(ready_tasks)()
logg.debug('group {}'.format(t))
celery.group(ready_tasks)()
return txs

View File

@@ -21,7 +21,6 @@ from chainqueue.db.models.tx import Otx
from chainqueue.db.models.tx import TxCache
from chainqueue.db.enum import StatusBits
from chainqueue.error import NotLocalTxError
from potaahto.symbols import snake_and_camel
# local imports
from cic_eth.db import SessionBase
@@ -59,9 +58,6 @@ def hashes_to_txs(self, tx_hashes):
if len(tx_hashes) == 0:
raise ValueError('no transaction to send')
for i in range(len(tx_hashes)):
tx_hashes[i] = strip_0x(tx_hashes[i])
queue = self.request.delivery_info['routing_key']
session = SessionBase.create_session()
@@ -152,7 +148,7 @@ def send(self, txs, chain_spec_dict):
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_spec_dict):
"""Force update of network status of a single transaction
"""Force update of network status of a simgle transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
@@ -177,14 +173,12 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
# TODO: apply receipt in tx object to validate and normalize input
if rcpt != None:
rcpt = snake_and_camel(rcpt)
success = rcpt['status'] == 1
logg.debug('sync tx {} mined block {} tx index {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], rcpt['transactionIndex'], success))
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
s = celery.signature(
'cic_eth.queue.state.set_final',
[
chain_spec_dict,
tx_hash_hex,
rcpt['blockNumber'],
rcpt['transactionIndex'],
@@ -192,14 +186,12 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
],
queue=queue,
)
# TODO: it's not entirely clear how we can reliable determine that its in mempool without explicitly checking
else:
logg.debug('sync tx {} mempool'.format(tx_hash_hex))
s = celery.signature(
'cic_eth.queue.state.set_sent',
[
chain_spec_dict,
tx_hash_hex,
],
queue=queue,

View File

@@ -7,7 +7,7 @@ from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth_registry import CICRegistry
from eth_address_declarator import Declarator
from eth_address_declarator import AddressDeclarator
# local imports
from cic_eth.task import BaseTask
@@ -23,12 +23,12 @@ def translate_address(address, trusted_addresses, chain_spec, sender_address=ZER
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
c = Declarator(chain_spec)
c = AddressDeclarator(chain_spec)
for trusted_address in trusted_addresses:
o = c.declaration(declarator_address, trusted_address, address, sender_address=sender_address)
r = rpc.do(o)
declaration_hex = Declarator.parse_declaration(r)
declaration_hex = AddressDeclarator.parse_declaration(r)
declaration_hex = declaration_hex[0].rstrip('0')
declaration_bytes = bytes.fromhex(declaration_hex)
declaration = None

View File

@@ -14,13 +14,13 @@ from chainlib.eth.tx import (
)
from chainlib.eth.block import block_by_number
from chainlib.eth.contract import abi_decode_single
from chainlib.eth.erc20 import ERC20
from hexathon import strip_0x
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusEnum
from chainqueue.sql.query import get_tx_cache
from eth_erc20 import ERC20
from chainqueue.query import get_tx_cache
# local imports
from cic_eth.queue.time import tx_times
@@ -114,7 +114,7 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
# TODO: pass through registry to validate declarator entry of token
#token = registry.by_address(tx['to'], sender_address=self.call_address)
token = ERC20Token(chain_spec, rpc, tx['to'])
token = ERC20Token(rpc, tx['to'])
token_symbol = token.symbol
token_decimals = token.decimals
times = tx_times(tx['hash'], chain_spec)

View File

@@ -5,7 +5,7 @@ import datetime
import celery
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
import chainqueue.sql.query
import chainqueue.query
from chainqueue.db.enum import (
StatusEnum,
is_alive,
@@ -28,7 +28,7 @@ celery_app = celery.current_app
def get_tx_cache(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_tx_cache(chain_spec, tx_hash, session=session)
r = chainqueue.query.get_tx_cache(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -37,7 +37,7 @@ def get_tx_cache(chain_spec_dict, tx_hash):
def get_tx(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_tx(chain_spec, tx_hash, session=session)
r = chainqueue.query.get_tx(chain_spec, tx_hash)
session.close()
return r
@@ -46,7 +46,7 @@ def get_tx(chain_spec_dict, tx_hash):
def get_account_tx(chain_spec_dict, address, as_sender=True, as_recipient=True, counterpart=None):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None, session=session)
r = chainqueue.query.get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None, session=session)
session.close()
return r
@@ -55,17 +55,17 @@ def get_account_tx(chain_spec_dict, address, as_sender=True, as_recipient=True,
def get_upcoming_tx_nolock(chain_spec_dict, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_upcoming_tx(chain_spec, status, not_status=not_status, recipient=recipient, before=before, limit=limit, session=session, decoder=unpack)
r = chainqueue.query.get_upcoming_tx(chain_spec, status, not_status=not_status, recipient=recipient, before=before, limit=limit, session=session, decoder=unpack)
session.close()
return r
def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None):
return chainqueue.sql.query.get_status_tx_cache(chain_spec, status, not_status=not_status, before=before, exact=exact, limit=limit, session=session, decoder=unpack)
return chainqueue.query.get_status_tx_cache(chain_spec, status, not_status=not_status, before=before, exact=exact, limit=limit, session=session, decoder=unpack)
def get_paused_tx(chain_spec, status=None, sender=None, session=None, decoder=None):
return chainqueue.sql.query.get_paused_tx_cache(chain_spec, status=status, sender=sender, session=session, decoder=unpack)
return chainqueue.query.get_paused_tx_cache(chain_spec, status=status, sender=sender, session=session, decoder=unpack)
def get_nonce_tx(chain_spec, nonce, sender):

View File

@@ -1,6 +1,6 @@
# external imports
from chainlib.chain import ChainSpec
import chainqueue.sql.state
import chainqueue.state
# local imports
import celery
@@ -14,7 +14,7 @@ celery_app = celery.current_app
def set_sent(chain_spec_dict, tx_hash, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_sent(chain_spec, tx_hash, fail, session=session)
r = chainqueue.state.set_sent(chain_spec, tx_hash, fail, session=session)
session.close()
return r
@@ -23,7 +23,7 @@ def set_sent(chain_spec_dict, tx_hash, fail=False):
def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_final(chain_spec, tx_hash, block=block, tx_index=tx_index, fail=fail, session=session)
r = chainqueue.state.set_final(chain_spec, tx_hash, block=block, tx_index=tx_index, fail=fail, session=session)
session.close()
return r
@@ -32,7 +32,7 @@ def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
def set_cancel(chain_spec_dict, tx_hash, manual=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_cancel(chain_spec, tx_hash, manual, session=session)
r = chainqueue.state.set_cancel(chain_spec, tx_hash, manual, session=session)
session.close()
return r
@@ -41,7 +41,7 @@ def set_cancel(chain_spec_dict, tx_hash, manual=False):
def set_rejected(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_rejected(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_rejected(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -50,7 +50,7 @@ def set_rejected(chain_spec_dict, tx_hash):
def set_fubar(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_fubar(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_fubar(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -59,7 +59,7 @@ def set_fubar(chain_spec_dict, tx_hash):
def set_manual(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_manual(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_manual(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -68,7 +68,7 @@ def set_manual(chain_spec_dict, tx_hash):
def set_ready(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_ready(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_ready(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -77,7 +77,7 @@ def set_ready(chain_spec_dict, tx_hash):
def set_reserved(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_reserved(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_reserved(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -86,7 +86,7 @@ def set_reserved(chain_spec_dict, tx_hash):
def set_waitforgas(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_waitforgas(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_waitforgas(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -95,7 +95,7 @@ def set_waitforgas(chain_spec_dict, tx_hash):
def get_state_log(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.get_state_log(chain_spec, tx_hash, session=session)
r = chainqueue.state.get_state_log(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -104,6 +104,6 @@ def get_state_log(chain_spec_dict, tx_hash):
def obsolete(chain_spec_dict, tx_hash, final):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.obsolete_by_cache(chain_spec, tx_hash, final, session=session)
r = chainqueue.state.obsolete_by_cache(chain_spec, tx_hash, final, session=session)
session.close()
return r

View File

@@ -12,7 +12,6 @@ from chainqueue.error import NotLocalTxError
# local imports
from cic_eth.task import CriticalSQLAlchemyAndWeb3Task
from cic_eth.db.models.base import SessionBase
celery_app = celery.current_app

View File

@@ -15,14 +15,14 @@ from sqlalchemy import tuple_
from sqlalchemy import func
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
import chainqueue.sql.state
import chainqueue.state
from chainqueue.db.enum import (
StatusEnum,
StatusBits,
is_alive,
dead,
)
from chainqueue.sql.tx import create
from chainqueue.tx import create
from chainqueue.error import NotLocalTxError
from chainqueue.db.enum import status_str

View File

@@ -5,30 +5,29 @@ import logging
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.declarator import AddressDeclaratorLookup
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from chainlib.eth.constant import ZERO_ADDRESS
logg = logging.getLogger()
def connect_token_registry(rpc, chain_spec, sender_address=ZERO_ADDRESS):
def connect_token_registry(rpc, chain_spec):
registry = CICRegistry(chain_spec, rpc)
token_registry_address = registry.by_name('TokenRegistry', sender_address=sender_address)
token_registry_address = registry.by_name('TokenRegistry')
logg.debug('using token registry address {}'.format(token_registry_address))
lookup = TokenIndexLookup(chain_spec, token_registry_address)
CICRegistry.add_lookup(lookup)
def connect_declarator(rpc, chain_spec, trusted_addresses, sender_address=ZERO_ADDRESS):
def connect_declarator(rpc, chain_spec, trusted_addresses):
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
declarator_address = registry.by_name('AddressDeclarator')
logg.debug('using declarator address {}'.format(declarator_address))
lookup = AddressDeclaratorLookup(chain_spec, declarator_address, trusted_addresses)
CICRegistry.add_lookup(lookup)
def connect(rpc, chain_spec, registry_address, sender_address=ZERO_ADDRESS):
def connect(rpc, chain_spec, registry_address):
CICRegistry.address = registry_address
registry = CICRegistry(chain_spec, rpc)
registry_address = registry.by_name('ContractRegistry', sender_address=sender_address)
registry_address = registry.by_name('ContractRegistry')
return registry

View File

@@ -21,7 +21,7 @@ from chainqueue.db.enum import (
StatusBits,
)
from chainqueue.error import NotLocalTxError
from chainqueue.sql.state import set_reserved
from chainqueue.state import set_reserved
# local imports
import cic_eth

View File

@@ -3,20 +3,19 @@ import logging
# external imports
import celery
from cic_eth_registry.error import (
UnknownContractError,
NotAContractError,
)
from cic_eth_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus
from chainlib.eth.address import to_checksum_address
from chainlib.eth.error import RequestMismatchException
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.erc20 import ERC20
from hexathon import (
strip_0x,
add_0x,
)
from eth_erc20 import ERC20
from erc20_faucet import Faucet
# TODO: use sarafu_Faucet for both when inheritance has been implemented
from erc20_single_shot_faucet import SingleShotFaucet
from sarafu_faucet import MinterFaucet as Faucet
# local imports
from .base import SyncFilter
@@ -72,15 +71,14 @@ class CallbackFilter(SyncFilter):
#transfer_data['token_address'] = tx.inputs[0]
faucet_contract = tx.inputs[0]
c = Faucet(self.chain_spec)
c = SingleShotFaucet(self.chain_spec)
o = c.token(faucet_contract, sender_address=self.caller_address)
r = conn.do(o)
transfer_data['token_address'] = add_0x(c.parse_token(r))
o = c.token_amount(faucet_contract, sender_address=self.caller_address)
o = c.amount(faucet_contract, sender_address=self.caller_address)
r = conn.do(o)
transfer_data['value'] = c.parse_token_amount(r)
transfer_data['value'] = c.parse_amount(r)
return ('tokengift', transfer_data)
@@ -129,7 +127,8 @@ class CallbackFilter(SyncFilter):
(transfer_type, transfer_data) = parser(tx, conn)
if transfer_type == None:
continue
break
else:
pass
except RequestMismatchException:
continue
@@ -172,9 +171,7 @@ class CallbackFilter(SyncFilter):
t = self.call_back(transfer_type, result)
logg.info('callback success task id {} tx {} queue {}'.format(t, tx.hash, t.queue))
except UnknownContractError:
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(self.queue, self.method, transfer_data['to'], tx.hash))
except NotAContractError:
logg.debug('callback filter {}:{} skipping "transfer" on non-contract address {} tx {}'.format(self.queue, self.method, transfer_data['to'], tx.hash))
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tx.queue, tx.method, transfer_data['to'], tx.hash))
def __str__(self):

View File

@@ -10,15 +10,14 @@ from chainlib.eth.tx import unpack
from chainqueue.db.enum import StatusBits
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
from chainqueue.sql.query import get_paused_tx_cache as get_paused_tx
from chainqueue.query import get_paused_tx_cache as get_paused_tx
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.eth.gas import create_check_gas_task
from .base import SyncFilter
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
logg = logging.getLogger().getChild(__name__)
class GasFilter(SyncFilter):
@@ -28,11 +27,11 @@ class GasFilter(SyncFilter):
self.chain_spec = chain_spec
def filter(self, conn, block, tx, db_session):
def filter(self, conn, block, tx, session):
if tx.value > 0:
tx_hash_hex = add_0x(tx.hash)
logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.bind_session(db_session)
session = SessionBase.bind_session(session)
q = session.query(TxCache.recipient)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==strip_0x(tx_hash_hex))
@@ -57,7 +56,7 @@ class GasFilter(SyncFilter):
tx_hashes_hex=list(txs.keys()),
queue=self.queue,
)
return s.apply_async()
s.apply_async()
def __str__(self):

View File

@@ -14,7 +14,7 @@ from .base import SyncFilter
logg = logging.getLogger().getChild(__name__)
account_registry_add_log_hash = '0x9cc987676e7d63379f176ea50df0ae8d2d9d1141d1231d4ce15b5965f73c9430'
account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd'
class RegistrationFilter(SyncFilter):
@@ -50,8 +50,7 @@ class RegistrationFilter(SyncFilter):
queue=self.queue,
)
s_nonce.link(s_gift)
t = s_nonce.apply_async()
return t
s_nonce.apply_async()
def __str__(self):

View File

@@ -3,7 +3,7 @@ import logging
# external imports
import celery
from chainqueue.sql.state import obsolete_by_cache
from chainqueue.state import obsolete_by_cache
logg = logging.getLogger()

View File

@@ -32,7 +32,7 @@ class TransferAuthFilter(SyncFilter):
self.transfer_request_contract = registry.by_name('TransferAuthorization', sender_address=call_address)
def filter(self, conn, block, tx, db_session): #rcpt, chain_str, session=None):
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
if tx.payload == None:
logg.debug('no payload')
@@ -45,17 +45,16 @@ class TransferAuthFilter(SyncFilter):
return False
recipient = tx.inputs[0]
#if recipient != self.transfer_request_contract.address():
if recipient != self.transfer_request_contract:
if recipient != self.transfer_request_contract.address():
logg.debug('not our transfer auth contract address {}'.format(recipient))
return False
r = TransferAuthorization.parse_create_request_request(tx.payload)
sender = r[0]
recipient = r[1]
token = r[2]
value = r[3]
sender = abi_decode_single(ABIContractType.ADDRESS, r[0])
recipient = abi_decode_single(ABIContractType.ADDRESS, r[1])
token = abi_decode_single(ABIContractType.ADDRESS, r[2])
value = abi_decode_single(ABIContractType.UINT256, r[3])
token_data = {
'address': token,
@@ -65,7 +64,6 @@ class TransferAuthFilter(SyncFilter):
'cic_eth.eth.nonce.reserve_nonce',
[
[token_data],
self.chain_spec.asdict(),
sender,
],
queue=self.queue,
@@ -82,7 +80,7 @@ class TransferAuthFilter(SyncFilter):
)
s_nonce.link(s_approve)
t = s_nonce.apply_async()
return t
return True
def __str__(self):

View File

@@ -30,7 +30,7 @@ class TxFilter(SyncFilter):
if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
logg.debug('otx filter match on {}'.format(otx.tx_hash))
logg.info('tx filter match on {}'.format(otx.tx_hash))
db_session.flush()
SessionBase.release_session(db_session)
s_final_state = celery.signature(

View File

@@ -22,7 +22,6 @@ from chainlib.eth.connection import (
from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx
from cic_eth_registry.error import UnknownContractError
from cic_eth_registry.erc20 import ERC20Token
import liveness.linux
@@ -37,7 +36,6 @@ from cic_eth.eth import (
from cic_eth.admin import (
debug,
ctrl,
token,
)
from cic_eth.queue import (
query,
@@ -76,6 +74,7 @@ argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-r', type=str, help='CIC registry address')
argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
@@ -121,25 +120,20 @@ broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
conf_update = {
current_app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
conf_update = {
'broker_url': broker,
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
current_app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
@@ -208,11 +202,6 @@ def main():
BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol)
default_token = ERC20Token(chain_spec, rpc, BaseTask.default_token_address)
default_token.load(rpc)
BaseTask.default_token_decimals = default_token.decimals
BaseTask.default_token_name = default_token.name
BaseTask.run_dir = config.get('CIC_RUN_DIR')
logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address))

View File

@@ -51,23 +51,15 @@ from cic_eth.registry import (
script_dir = os.path.realpath(os.path.dirname(__file__))
def add_block_args(argparser):
argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.y, '_KEYSTORE_FILE', True)
config.add(args.q, '_CELERY_QUEUE', True)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
@@ -77,9 +69,9 @@ SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def main():
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
@@ -97,7 +89,7 @@ def main():
stat = init_chain_stat(rpc, block_start=block_current)
loop_interval = stat.block_average()
logg.debug('current block height {}'.format(block_offset))
logg.debug('starting at block {}'.format(block_offset))
syncers = []
@@ -106,13 +98,8 @@ def main():
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_offset = block_offset
if config.get('_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
logg.info('found no backends to resume')
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
@@ -168,6 +155,7 @@ def main():
for cf in callback_filters:
syncer.add_filter(cf)
#r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
r = syncer.loop(int(loop_interval), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))

View File

@@ -12,10 +12,7 @@ import confini
import celery
# local imports
from cic_eth.api import (
Api,
AdminApi,
)
from cic_eth.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -56,20 +53,13 @@ celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=confi
queue = args.q
api = Api(config.get('CIC_CHAIN_SPEC'), queue=queue)
admin_api = AdminApi(None)
def main():
t = admin_api.registry()
registry_address = t.get()
print('Registry: {}'.format(registry_address))
t = api.default_token()
token_info = t.get()
print('Default token symbol: {}'.format(token_info['symbol']))
print('Default token address: {}'.format(token_info['address']))
logg.debug('Default token name: {}'.format(token_info['name']))
logg.debug('Default token decimals: {}'.format(token_info['decimals']))
if __name__ == '__main__':
main()

View File

@@ -20,11 +20,7 @@ def init_chain_stat(rpc, block_start=0):
if block_start == 0:
o = block_latest()
r = rpc.do(o)
try:
block_start = int(r, 16)
except TypeError:
block_start = int(r)
logg.debug('blockstart {}'.format(block_start))
block_start = int(r, 16)
for i in range(BLOCK_SAMPLES):
o = block_by_number(block_start-10+i)

View File

@@ -20,8 +20,7 @@ import liveness.linux
from cic_eth.error import SeppukuError
from cic_eth.db.models.base import SessionBase
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
logg = logging.getLogger().getChild(__name__)
celery_app = celery.current_app
@@ -34,8 +33,6 @@ class BaseTask(celery.Task):
create_gas_oracle = RPCGasOracle
default_token_address = None
default_token_symbol = None
default_token_name = None
default_token_decimals = None
run_dir = '/run'
def create_session(self):
@@ -119,13 +116,12 @@ def registry():
return CICRegistry.address
@celery_app.task(bind=True, base=BaseTask)
def registry_address_lookup(self, chain_spec_dict, address, connection_tag='default'):
@celery_app.task()
def registry_address_lookup(chain_spec_dict, address, connection_tag='default'):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
conn = RPCConnection.connect(chain_spec, tag=connection_tag)
registry = CICRegistry(chain_spec, conn)
r = registry.by_address(address, sender_address=self.call_address)
return r
return registry.by_address(address)
@celery_app.task(throws=(UnknownContractError,))
@@ -133,7 +129,7 @@ def registry_name_lookup(chain_spec_dict, name, connection_tag='default'):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
conn = RPCConnection.connect(chain_spec, tag=connection_tag)
registry = CICRegistry(chain_spec, conn)
return registry.by_name(name, sender_address=self.call_address)
return registry.by_name(name)
@celery_app.task()

View File

@@ -10,7 +10,7 @@ version = (
0,
11,
0,
'beta.16',
'beta.11',
)
version_object = semver.VersionInfo(

View File

@@ -1,4 +1,3 @@
[celery]
broker_url = redis://
result_url = redis://
debug = 0

View File

@@ -1,4 +1,3 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379
debug = 0

View File

@@ -1,3 +1,2 @@
[SYNCER]
loop_interval =
history_start = 0

View File

@@ -1,3 +1,2 @@
[SYNCER]
loop_interval =
history_start = 0

View File

@@ -1,22 +0,0 @@
@node cic-eth-accounts
@section Accounts
Accounts are private keys in the signer component keyed by "addresses," a one-way transformation of a public key. Data can be signed by using the account as identifier for corresponding RPC requests.
Any account to be managed by @code{cic-eth} must be created by the corresponding task. This is because @code{cic-eth} creates a @code{nonce} entry for each newly created account, and guarantees that every nonce will only be used once in its threaded environment.
The calling code receives the account address upon creation. It never receives or has access to the private key.
@subsection Signer RPC
The signer is expected to handle a subset of the standard JSON-RPC:
@table @code
@item personal_newAccount(password)
Creates a new account, returning the account address.
@item eth_signTransactions(tx_dict)
Sign the transaction represented as a dictionary.
@item eth_sign(address, message)
Signs an arbtirary message with the standard Ethereum prefix.
@end table

View File

@@ -1,60 +0,0 @@
@node cic-eth system maintenance
@appendix Admin API
The admin API is still in an early stage of refinement. User friendliness can be considerably improved.
All of the API calls are celery task proxies, and return @code{Celery.AsyncResult} unless otherwise noted.
In contrast to the client API module, this API does not currently implement a pluggable callback.
@appendixsection registry
Returns the @code{ContractRegistry} this instance of @code{cic-eth-tasker} is running on.
@appendixsection proxy-do
Execute an arbitary JSON-RPC request using the @code{cic-eth-tasker} blockchain node RPC connection.
@appendixsection default_token
Returns the default token symbol and address.
@appendixsection lock
Set lock bits, globally or per address
@appendixsection unlock
Opposite of lock
@appendixsection get_lock
Get the current state of a lock
@appendixsection tag_account
Associate an identifier with an account address (@xref{cic-eth system accounts})
@appendixsection have_account
Check whether a private key exists in the keystore able to sign on behalf of the given account (it actually performs a signature).
@appendixsection resend
Clone or resend a transaction
@appendixsection check_nonce
Returns diagnostics for nonce sequences per account, e.g. detect nonce gaps that block execution of further transactions.
@appendixsection fix_nonce
Re-orders all nonces by shifting all transaction nonces after the given transaction down by one. This has the additional effect of obsoleting the given transaction. Can be used to close gaps in the nonce sequencing. Use with care!
@appendixsection account
Return brief transaction info lists per account
@appendixsection tx
Return a complex transaction metadata object for a single transaction. The object assembles state from both the blockchain node and the custodial queue system.

View File

@@ -1,18 +0,0 @@
\input texinfo
@setfilename index.html
@settitle CIC custodial services reference deployment
@copying
Released 2021 under GPL3
@end copying
@titlepage
@title CIC custodial services reference deployment
@author Louis Holbrook
@end titlepage
@c
@contents
@include index.texi

View File

@@ -1,4 +0,0 @@
@node cic-eth Appendix Task chains
@appendix Task chains
TBC - explain here how to generate these chain diagrams

View File

@@ -1,108 +0,0 @@
@node cic-eth configuration
@section Configuration
(refer to @code{cic-base} for a general overview of the config pipeline)
Configuration parameters are grouped by configuration filename.
@subsection cic
@table @var
@item registry_address
Ethereum address of the @var{ContractRegistry} contract
@item chain_spec
String representation of the connected blockchain according to the @var{chainlib} @var{ChainSpec} format.
@item tx_retry_delay
Minimum time in seconds to wait before retrying a transaction
@item trust_address
Comma-separated list of one or more ethereum addresses regarded as trusted for describing other resources, Used by @var{cic-eth-registry} in the context of the @var{AddressDeclarator}.
@item defalt_token_symbol
Fallback token to operate on when no other context is given.
@item health_modules
Comma-separated list of methods to execute liveness tests against. (see ...)
@item run_dir
Directory to use for session-scoped variables for @var{cic-eth} daemon parent processes.
@end table
@subsection celery
@table @var
@item broker_url
Message broker URL
@item result_url
Result backend URL
@item debug
Boolean value. If set, the amount of available context for a task in the result backend will be maximized@footnote{This is a @emph{required} setting for the task graph documenter to enabled it to display task names in the graph}.
@end table
@subsection database
See ref cic-base when ready
@subsection eth
@table @var
@item provider
Address of default RPC endpoint for transactions and state queries.
@item gas_gifter_minimum_balance
The minimum gas balance that must be held by the @code{GAS GIFTER} token before the queue processing shuts down@footnote{You should really make sure that this threshold is never hit}
@end table
@subsection redis
Defines connection to the redis server used outside of the context of @var{celery}. This is usually the same server, but should be a different db.
@table @var
@item host
Redis hostname
@item port
Redis port
@item db
Redis db
@end table
@subsection signer
Parameters
@table @var
@item socket_path
The connection string for the signer JSON-RPC service.@footnote{The @var{crypto-dev-signer} supports UNIX socket or a HTTP(S) connections}
@item secret
If set, this password is used to add obfuscation on top of the encryption already applied by the signer for the keystore.
@end table
@subsection ssl
Certificate information for https api callbacks.
@table @var
@item enable_client
Boolean value. If set, client certificate will be used to authenticate the callback request.
@item cert_file
Client certificate file in PEM or DER format
@item key_file
Client key file in PEM or DER format
@item password
Password for unlocking the client key
@item ca_file
Certificate authority bundle, to verify the certificate sent by the callback server.
@end table
@subsection syncer
@table @var
@item loop_interval
Seconds to pause before each execution of the @var{chainsyncer} poll loop.
@end table

View File

@@ -1,46 +0,0 @@
@node cic-eth-dependencies
@section Dependencies
This application is written in Python 3.8. It is tightly coupled with @code{python-celery}, which provides the task worker ecosystem. It also uses @code{SQLAlchemy} which provides useful abstractions for persistent storage though SQL, and @code{alembic} for database schema migrations.
There is currently also a somewhat explicit coupling with @code{Redis}, which is used as message broker for @code{python-celery}. @code{Redis} is also explicitly used by some CLI tools to retrieve results from command execution. This coupling may be relaxed in the future to allow other key-value pubsub solutions instead.
@subsection Generalized project dependencies
The core features are built around four main independent components that have been developed for the purpose of this project, but are separated and maintained as general-purpose libraries.
@table @samp
@item chainlib
A cross-chain library prototype that can provide encodings for transactions on a Solidity-based EVM contract network.
@item chainqueue
Queue manager that guarantees delivery of outgoing blockchain transactions.
@item chainsyncer
Monitors blockchains and guarantees execution of an arbitrary count of pluggable code objects for each block transaction.
@item crypto-dev-signer
An keystore capable of signing for the EVM chain through a standard Ethereum JSON-RPC interface.
@end table
@anchor{cic-eth-dependencies-smart-contracts}
@subsection Smart contract dependencies
The Smart contracts needed by the network must be discoverable through a single entry point called the Contract Registry. The contract registry is expected to reference itself in its records. The authenticity of the contract registry must be guaranteed by external sources of trust.
The contract registry maps contract addresses to well-known identifiers. The contracts are as follows:
@table @code
@item ContractRegistry (points to self)
Resolves plaintext identifiers to contract addresses.
@item AccountRegistry
An append-only store of accounts hosted by the custodial system
@item TokenRegistry
Unique symbol-to-address mappings for token contracts
@item AddressDeclarator
Reverse address to resource lookup
@item TokenAuthorization
Escrow contract for external spending on behalf of custodial users
@item Faucet
Called by newly created accounts to receive initial token balance
@end table
The dependency @code{cic-eth-registry} abstracts and facilitates lookups of resources on the blockchain network. In its current state it resolves tokens by symbol or address, and contracts by common-name identifiers. In the @code{cic-eth} code all lookups for EVM network resources will be performed through this dependency.

View File

@@ -1,49 +0,0 @@
@node cic-eth-incoming
@section Incoming transactions
All transactions in mined blocks will be passed to a selection of plugin filters to the @code{chainsyncer} component. Each of these filters are individual python module files in @code{cic_eth.runnable.daemons.filters}. This section describes their function.
The status bits refer to the bits definining the @code{chainqueue} state.
@subsection tx
Looks up the transaction in the local queue, and if found it sets the @code{FINAL} state bit. If the contract code execution was unsuccessful, the @code{NETWORK ERROR} state bit is also set.
@subsection gas
If the transaction is a gas token transfer, it checks if the recipient is a custodial account awaiting gas refill to execute a transaction (the queue item will have the @code{GAS ISSUES} bit set). If this is the case, the transaction will be activated by setting the @code{QUEUED} bit.
@subsection register
If the transaction is an account registration@footnote{The contract keyed by @var{AccountRegistry} in the @var{ContractRegistry} contract}, a Faucet transaction will be triggered for the registered account@footnote{The faucet contract used in the reference implementation will verify whether the account calling it is registered in the @var{AccountRegistry}. Thus it cannot be called before the account registration has succeeded.}
@subsection callback
Executes, in order, Celery tasks defined in the configuration variable @var{TASKS_TRANSFER_CALLBACKS}. Each of these tasks are registered as individual filters in the @code{chainsyncer} component, with the corresponding execution guarantees.
The callbacks will receive the following arguments
@enumerate
@item @strong{result}
A complex representation of the transaction (see section ?)
@item @strong{transfertype}
A string describing the type of transaction found@footnote{See appendix ? for an overview of possible values}
@item @strong{status}
0 if contract code executed successfully. Any other value is an error@footnote{The values 1-1024 are reserved for system specific errors. In the current implementation only a general error state with value 1 is defined. See appendix ?.}
@end enumerate
@subsection transferauth
If a valid transfer authorization request has been made, a token @emph{allowance}@footnote{@code{approve} for ERC20 tokens} transaction is executed on behalf of the custodial account, with the @var{TransferAuthorization} contract as spender.
@subsection convert
If the transaction is a token conversion, @emph{and} there is a pending transfer registered for the conversion, the corresponding token transfer transaction will be executed. Not currently implemented

View File

@@ -1,14 +0,0 @@
@top cic-eth
@include intro.texi
@include dependencies.texi
@include configuration.texi
@include system.texi
@include interacting.texi
@include outgoing.texi
@include incoming.texi
@include services.texi
@include tools.texi
@include admin.texi
@include chains.texi
@include transfertypes.texi

View File

@@ -1,109 +0,0 @@
@node cic-eth-interacting
@section Interacting with the system
The API to the @var{cic-eth} component is a proxy for executing @emph{chains of Celery tasks}. The tasks that compose individual chains are documented in @ref{cic-eth Appendix Task chains,the Task Chain appendix}, which also describes a CLI tool that can generate graph representationso of them.
There are two API classes, @var{Api} and @var{AdminApi}. The former is described later in this section, the latter described in @ref{cic-eth system maintenance,the Admin API appendix}.
@subsection Interface
API calls are constructed by creating @emph{Celery task signatures} and linking them together, sequentially and/or in parallell. In turn, the tasks themselves may spawn other asynchronous tasks. This means that the code in @file{cic_eth.api.*} does not necessarily specify the full task graph that will be executed for any one command.
The operational guarantee that tasks will be executed, not forgotten, and retried under certain circumstances is deferred to @var{Celery}. On top of this, the @var{chainqueue} component ensures that semantic state changes that the @code{Celery} tasks ask of it are valid.
@anchor{cic-eth-locking}
@subsection Locking
All methods that make a change to the blockchain network must pass @emph{locking layer checks}. Locks may be applied on a global or per-address basis. Lock states are defined by a combination of bit flags. The implemented lock bits are:
@table @var
@item INIT
The system has not yet been initialized. In this state, writes are limited to creating unregistered accounts only.
@item QUEUE
Items may not be added to the queue
@item SEND
Queued items may not be attempted sent to the network
@item CREATE (global-only)
New accounts may not be created
@item STICKY
Until reset, no other part of the locking state can be reset
@end table
@subsection Callback
All API calls provide the option to attach a callback to the end of the task chain. This callback will be executed regardless of whether task chain execution succeeded or not.
Refer to @file{cic-eth.callbacks.noop.noop} for the expected callback signature.
@subsection API Methods that change state
@subsubsection create_account
Creates a new account in the keystore, optionally registering the account with the @var{AccountRegistry} contract.
@subsubsection transfer
Attempts to execute a token transaction between two addresses. It is the caller's responsibility to check whether the token balance is sufficient for the transactions.
@subsubsection refill_gas
Executes a gas token transfer to a custodial address from the @var{GAS GIFTER} system account.
@subsubsection convert
Converts a token to another token for the given custodial account. Currently not implemented.
@anchor{cic-eth-convert-and-transfer}
@subsubsection convert_and_transfer
Same as convert, but will automatically execute a token transfer to another custodial account when conversion has been completed. Currently not implemented.
@subsection Read-only API methods
@subsubsection balance
Retrieves a complex balance statement of a single account, including:
@itemize
@item The network balance at the current block height
@item Value reductions due to by pending outgoing transactions
@item Value increments due to by pending incoming transactions
@end itemize
Only the first of these balance items has guaranteed finality. The reduction by outgoing transaction can be reasonably be assumed to eventually become final. The same applies for the increment by incoming transaction, @emph{unless} the transfer is part of a multiple-transaction operation. For example, a @ref{cic-eth-convert-and-transfer,convert_and_transfer} operation may fail in the convert stage and/or may yield less tokens then expected after conversion.
@subsubsection list
Returns an aggregate iist of all token value changes for a given address. As not all value transfers are a result of literal value transfer contract calls (e.g. @var{transfer} and @var{transferFrom} in @var{ERC20}), this data may come from a number of sources, including:
@itemize
@item Literal value transfers within the custodial system
@item Literal value transfers from or to an external address
@item Faucet invocations (token minting)
@item Demurrage and redistribution built into the token contract
@end itemize
@subsubsection default_token
Return the symbol and address of the token used by default in the network.
@subsubsection ping
Convenience method for the caller to check whether the @var{cic-eth} engine is alive.

View File

@@ -1,74 +0,0 @@
@node cic-eth-outgoing
@section Outgoing transactions
@strong{Important! A pre-requisite for proper functioning of the component is that no other agent is sending transactions to the network for any of the keys in the keystore.}
The term @var{state bit} refers to the bits definining the @code{chainqueue} state.
@subsection Lock
Any task that changes blockchain state @strong{must} apply a @code{QUEUE} lock for the address it operates on. This is to ensure that transactions are sent to the network in order.@footnote{If too many transactions arrive out of order to the blockchain node, it may arbitrarily prune those that cannot directly be included in a block. This puts unnecessary strain (and reliance) on the transaction retry mechanism.}
This lock will be released once the blockchain node confirms handover of the transaction.@footnote{This is the responsibility of the @var{dispatcher} service}
@subsection Nonce
A separate task step is executed for binding a transaction nonce to a Celery task root id, which uniquely identifies the task chain. This provides atomicity of the nonce across the parallell task environment, and also recoverability in case unexpected program interruption.
The nonce of a permanently failed task must be @emph{manually} unlocked. Celery tasks that involve nonces who permanently fail are to be considered @emph{critical anomalies} and should not happen. The queue locking mechanism is designed to prevent the amount of out-of-sequence transactions for an account to escalate.
@subsection Choosing fee prices
@code{cic-eth} uses the @code{chainlib} module to resolve gas price lookups.
Optimizing gas price discovery should be the responsibility of the chainlib layer. It already accommodates using an separate RPC for the @code{eth_gasPrice} call.@footnote{A sample implementation of a gas price tracker speaking JSON-RPC (also built using chainlib/chainsyncer) can be found at @url{https://gitlab.com/nolash/eth-stat-syncer}.}
@subsection Choosing gas limits
To determine the gas limit of a transaction, normally the EVM node will be used to perform a dry-run exection of the inputs against the current chain state.
As the current state of the custodial system should only rely on known, trusted contract bytecode, there is no real need for this mechanism. The @code{chainlib}-based contract interfaces are expected to provide a method call that return safe gas limit values for contract interactions.@footnote{Of course, this method call may in turn conceal more sophisticated gas limit heuristics.}
Note that it is still the responsibility of @code{cic-eth} to make sure that the gas limit of the network is sufficient to allow execution of all needed contracts.
@subsection Gas refills
If the gas balance of a custodial account is below a certain threshold, a gas refill task will be spawned. The gas will be transferred from the @code{GAS GIFTER} system account.
In the event that the balance is insufficient even for the imminent transaction@footnote{This will of course be the case when an account is first created, whereupon it has a balance of 0. The subsequent faucet call will spawn a gas refill task.}, execution of the transaction will be deferred until the gas refill transaction is completed. In this case the transaction will be marked with the @code{GAS ISSUES} state bit.
The value chosen for the gas refill threshold should ideally allow enough of a margin to avoid the need of deferring transactions in the future.
@subsection Queueing transactions
Once the lock, nonce and gas processing parts has been completed, the transaction will be queued for sending. This means that the @code{QUEUED} state bit is set. From here the @ref{cic-eth-services-dispatcher,dispatcher service} takes over responsibility.
@subsection Retrying transactions
There are three conditions create the need to defer and retry transactions.
The first is communication problems with the blockchain node itself, for example if it is overloaded or being restarted. As far as possible, retries of this nature will be left to the Celery task workers. There may be cases, however, where it is appropriate to hand the responsibility to the @code{chainqueue} instead. In this case, the queue item will have the @code{NODE ERROR} state bit set.
The second condition occurs when transactions take too long to be confirmed by the network. In this case, the transaction will be re-submitted, but with a higher gas price.
The third condition occurs when the blockchain node purges the transaction from the mempool before it is sent to the network. @code{cic-eth} does not distinguish this case from the second, as the issue is solved using the same mechanism.
@subsubsection Transaction obsoletion
"Re-submitting" a transaction means creating a transaction with a previously used nonce for an account address.
When this happens, The @code{chainqueue} will still contain all previous transactions with the same nonce. The transaction being superseded will have the @code{OBSOLETED} state bit set.
Once a transaction has been mined, all other transactions with the same node will have the @code{OBSOLETED} and @code{FINAL} state bits set.
@subsection Unexpected conditions
Any unexpected condition exposing the need for urgent code improvement and/or manual intervention will be signalled by marking the transaction with the @code{FUBAR} state bit set.

View File

@@ -0,0 +1,24 @@
@node cic-eth
@chapter cic-eth
@section Overview
@code{cic-eth} is the heart of the custodial account component. It is a combination of python-celery task queues and daemons that sign, dispatch and monitor blockchain transactions, aswell as triggering tasks contingent on other transactions.
@subsection Dependencies
The @code{cic-registry} module is used as a cache for contracts and tokens on the network.
A web3 JSON-RPC service that transparently proxies a keystore and provides transaction and message signing. The current development version uses the python web3 middleware feature to route methodsi involving the keystore to the module @code{crypto-dev-signer}, which is hosted on @file{pypi.org}.
@subsection What does it do
@subsection Tasks
Two main categories exist for tasks, @code{eth} and @code{queue}.
The @code{eth} tasks provide means to construct and decode Ethereum transactions, as well as interfacing the underlying key store.
Tasks in the @code{queue} module operate on the state of transactions queued for processing by @code{cic-eth}.

View File

@@ -1,50 +0,0 @@
@node cic-eth-services
@section Services
There are four daemons that together orchestrate all of the aforementioned recipes. This section will provide a high level description of them.
Each of them have their own set of command line flags. These are available in the CLI help text provided by @kbd{-h} @kbd{--help} and are not recited here.
Daemon executable scripts are located in the @file{cic_eth.runnable.daemons} package. If @var{cic-eth} is installed as a python package, they are installed as executables in @var{PATH}.
@subsection tasker
This is the heart of the custodial system. Tasker is the parent process for the celery workers executing all tasks interacting with and changing the state of the queue and the chain. It is also the only service that interfaces with the signer/keystore.
The other @var{cic-eth} daemons all interface with this component, along with any client adapter bridging an end-user gateway (e.g. @var{cic-ussd}). However, the service itself does not have to be actively running for the other services to run; @var{Celery} handles queueing up the incoming tasks until the @var{tasker} comes back online.@footnote{Whereas this is true, there is currently no fail-safe implemented to handles the event of task backlog overflow in Celery. Furthermore, no targeted testing has yet been performed to asses the stability of the system over time if a sudden, sustained surge of resumed task executions occurs. It may be advisable to suspend activity that adds new queue items to the system if volume is high and/or the @var{cic-eth} outage endures. However, there is no panacea for this condition, as every usage scenario is different}
The tasker has a set of pre-requisites that must be fulfilled before it will start
@itemize
@item It must be given a valid @var{ContractRegistry} address, which must include valid references to all contracts specified in @ref{cic-eth-dependencies-smart-contracts,Smart contract dependencies}
@item The gas gifter balance must be above the minimum threshold (See "eth" section in configurations).
@item There must be a valid alembic migration record in the storage database
@item The redis backend must be reachable and writable
@item There must be a reachable JSON-RPC server at the other end of the signer socket path (see "signer" section in configurations)
@end itemize
@subsection tracker
Implements the @var{chainsyncer}, and registers the filters described in @ref{cic-eth-incoming,Incoming Transactions} to be executed for every transaction. It consumes the appropriate @var{TASKS_TRANSFER_CALLBACKS} configuration setting to add externally defined filters at without having to change the daemon code.
The @var{tracker} has the same requisities for the @var{ContractRegistry} as the @var{tasker}.
@strong{Important! Guarantees of filter executions has some caveats. Refer to the @var{chainsyncer} documentation for more details.}
@anchor{cic-eth-services-dispatcher}
@subsection dispatcher
Uses the @code{get_upcoming_tx} method call from @var{chainqueue} to receive batches of queued transactions that are ready to send to the blockchain node. Every batch will only contain a single transaction by any one address, which will be the transaction with the next nonce not previously seen by the network. There is no limit currently set to how many transactions that will be included in a single batch.
@subsection retrier
The responsibility of the @var{retrier} is to re-queue transactions that failed to be sent to the blockchain node, as well as create @emph{replacements} for transactions whose processing by the network has been delayed. @strong{[refer transaction obolestion]}.
It is in turn the responsiblity of the @var{dispatcher} to send these (re-)queued transactions to the blockchain node.

View File

@@ -1,17 +0,0 @@
@node cic-eth system accounts
@section System initialization
When the system starts for the first time, it is locked for any state change request other than account creation@footnote{Specifically, the @code{INIT}, @code{SEND} and @code{QUEUE} lock bits are set.}. These locks should be @emph{reset} once system initialization has been completed. Currently, system initialization only involves creating and tagging required system accounts, as specified below.
See @ref{cic-eth-locking,Locking} and @ref{cic-eth-tools-ctrl,ctrl in Tools} for details on locking.
@subsection System accounts
Certain accounts in the system have special roles. These are defined by @emph{tagging} certain accounts addresses with well-known identifiers.
@table @var
@item GAS_GIFTER
This account @strong{must} at all times have enough gas token to fund any custodial account address in need.
@item ACCOUNT_REGISTRY_WRITER
This account @strong{must} have access to add newly created account addresses to the (@xref{cic-eth-dependencies-smart-contracts,Smart contract dependencies})
@end table

View File

@@ -1,51 +0,0 @@
@node cic-eth-tools
@section Tools
A collection of CLI tools have been provided to help with diagnostics and other administrative tasks. These use the same configuration infrastructure as the daemons.
Tool scripts are located in the @file{cic_eth.runnable} package. If @var{cic-eth} is installed as a python package, they are installed as executables in @var{PATH}.
@subsection info (cic-eth-info)
Returns self-explanatory metadata for the blockchain network, and optionally an address.
@subsection inspect (cic-eth-inspect)
Returns information about a specific resource related to the tranasaction queue. The results returned depend on the type of the argument.
@table @var
@item lock
If the argument is the literal string @kbd{lock}, it will list all active lock settings currently in effect. (@xref{cic-eth-locking})
@item <address>
If the argument is a 0x-prefixed hex string of 42 characters, it returns all transactions where the specified address is a sender or recipient@footnote{If the address is the gas gifter or the accounts index writer, this may be a @emph{lot} of transactions. Use with care!}
@item <tx_hash>
If the argument is a 0x-prefixed hex string of 66 characters, it returns data from the custodial queueing system aswell as the network for a single transaction whose hash matches the input. Fails if the transaction does not exist in the queue
@item <code>
If the argument is a 0x-prefixed hex string longer than 66 bytes, the argument will be interpreted as raw RLP serialized transaction data, and attempt to match this with an entry in the queue. If a match is found, the result is the same as for @var{<tx_hash>}
@end table
@subsection create (cic-eth-create)
Create a new account, optionally registering the account in the accounts registry, and optionally receiving the newly created address through a redis subscription.
@subsection transfer (cic-eth-transfer)
Execute a token transfer on behalf of a custodial account.
@subsection tag (cic-eth-tag)
Associate an account address with a string identifier. @xref{cic-eth system accounts}
@anchor{cic-eth-tools-ctrl}
@subsection ctrl (cic-eth-ctrl)
Set or reset lock bits, globally or per account address.
@subsection resend (cic-eth-resend)
Resend a transaction. This can either be done "in-place," which means increasing the gas price and re-queueing@footnote{this is the same thing that the retrier does}. It can also be used to @emph{clone} a transaction, which obviously will duplicate the effect of the cloned transaction on the blockchain network.

View File

@@ -1,11 +0,0 @@
@node cic-eth Appendix Transaction types
@appendix Transfer types
@table @var
@item transfer
A regular token transfer, e.g. ERC20 @code{transfer}
@item transferfrom
A token transfer performed on behalf of another party, e.g. ERC20 @code{transferFrom}
@item tokengift
Result of a successful faucet request.
@end table

View File

@@ -1,62 +1,48 @@
FROM python:3.8.6-slim-buster as compile
# FROM grassrootseconomics:cic
#FROM python:3.8.6-alpine
FROM python:3.8.6-slim-buster
#COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
WORKDIR /usr/src/cic-eth
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
#RUN python -m venv venv && . venv/bin/activate
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
RUN /usr/local/bin/python -m pip install --upgrade pip
RUN pip install semver
# TODO use a packaging style that lets us copy requirments only ie. pip-tools
COPY cic-eth/ .
RUN pip install $pip_extra_index_url_flag .
# --- TEST IMAGE ---
FROM python:3.8.6-slim-buster as test
ARG root_requirement_file='requirements.txt'
#RUN apk update && \
# apk add gcc musl-dev gnupg libpq
#RUN apk add postgresql-dev
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
WORKDIR /usr/src/cic-eth
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
#COPY $root_requirement_file .
#RUN pip install -r $root_requirement_file $pip_extra_index_url_flag
RUN /usr/local/bin/python -m pip install --upgrade pip
#RUN git clone https://gitlab.com/grassrootseconomics/cic-base.git && \
# cd cic-base && \
# git checkout 7ae1f02efc206b13a65873567b0f6d1c3b7f9bc0 && \
# python merge_requirements.py | tee merged_requirements.txt
#RUN cd cic-base && \
# pip install $pip_extra_index_url_flag -r ./merged_requirements.txt
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a77
COPY --from=compile /usr/local/bin/ /usr/local/bin/
COPY --from=compile /usr/local/lib/python3.8/site-packages/ \
/usr/local/lib/python3.8/site-packages/
# TODO we could use venv inside container to isolate the system and app deps further
# COPY --from=compile /usr/src/cic-eth/ .
# RUN . venv/bin/activate
COPY cic-eth/scripts/ scripts/
COPY cic-eth/setup.cfg cic-eth/setup.py ./
COPY cic-eth/cic_eth/ cic_eth/
# Copy app specific requirements
COPY cic-eth/requirements.txt .
COPY cic-eth/test_requirements.txt .
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
COPY cic-eth .
ENV PYTHONPATH .
ENTRYPOINT ["pytest"]
# --- RUNTIME ---
FROM python:3.8.6-slim-buster as runtime
RUN apt-get update && \
apt install -y gnupg libpq-dev procps
WORKDIR /usr/src/cic-eth
COPY --from=compile /usr/local/bin/ /usr/local/bin/
COPY --from=compile /usr/local/lib/python3.8/site-packages/ \
/usr/local/lib/python3.8/site-packages/
RUN pip install $pip_extra_index_url_flag .
COPY cic-eth/docker/* ./
RUN chmod 755 *.sh
COPY cic-eth/tests/ tests/
COPY cic-eth/scripts/ scripts/
# # ini files in config directory defines the configurable parameters for the application
# # they can all be overridden by environment variables
# # to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
@@ -64,5 +50,8 @@ COPY cic-eth/config/ /usr/local/etc/cic-eth/
COPY cic-eth/cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/
COPY cic-eth/crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
COPY util/liveness/health.sh /usr/local/bin/health.sh
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
COPY util/liveness/health.sh /usr/local/bin/health.sh

View File

@@ -1,25 +1,25 @@
cic-base~=0.1.2b15
cic-base==0.1.2b3
celery==4.4.7
crypto-dev-signer~=0.4.14b3
confini~=0.3.6rc3
cic-eth-registry~=0.5.5a7
cic-eth-registry~=0.5.4a16
#cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
websockets==8.1
requests~=2.24.0
eth_accounts_index~=0.0.11a12
erc20-transfer-authorization~=0.3.1a7
eth_accounts_index~=0.0.11a9
erc20-transfer-authorization~=0.3.1a5
uWSGI==2.0.19.1
semver==2.13.0
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.1a11
chainlib~=0.0.3rc2
eth-address-index~=0.1.1a9
chainlib~=0.0.2a18
hexathon~=0.0.1a7
chainsyncer[sql]~=0.0.2a5
chainqueue~=0.0.2b3
sarafu-faucet==0.0.3a3
erc20-faucet==0.2.1a4
chainsyncer[sql]~=0.0.2a2
chainqueue~=0.0.1a7
pysha3==1.0.2
coincurve==15.0.0
potaahto~=0.0.1a2
pycryptodome==3.10.1
sarafu-faucet==0.0.2a28
potaahto~=0.0.1a1

View File

@@ -11,6 +11,17 @@ while True:
requirements.append(l.rstrip())
f.close()
test_requirements = []
f = open('test_requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
test_requirements.append(l.rstrip())
f.close()
setup(
install_requires=requirements
install_requires=requirements,
tests_require=test_requirements,
)

View File

@@ -4,3 +4,4 @@ pytest-mock==3.3.1
pytest-cov==2.10.1
eth-tester==0.5.0b3
py-evm==0.3.0a20
giftable-erc20-token==0.0.8a9

View File

@@ -1,8 +0,0 @@
# local imports
from cic_eth.check.db import health
def test_check_health(
init_database,
):
assert health()

View File

@@ -1,20 +0,0 @@
# local imports
from cic_eth.check.gas import health
from cic_eth.db.models.role import AccountRole
def test_check_gas(
config,
init_database,
default_chain_spec,
eth_rpc,
custodial_roles,
whoever,
):
config.add(str(default_chain_spec), 'CIC_CHAIN_SPEC', exists_ok=True)
config.add(100, 'ETH_GAS_GIFTER_MINIMUM_BALANCE', exists_ok=True)
assert health(config=config)
AccountRole.set('GAS_GIFTER', whoever, session=init_database)
init_database.commit()
assert not health(config=config)

View File

@@ -1,16 +0,0 @@
# external imports
import pytest
# local imports
from cic_eth.check.redis import health
def test_check_redis(
config,
have_redis,
):
if have_redis != None:
pytest.skip('cannot connect to redis, skipping test: {}'.format(have_redis))
assert health(unit='test', config=config)

View File

@@ -1,13 +0,0 @@
# local imports
from cic_eth.check.signer import health
def test_check_signer(
default_chain_spec,
config,
eth_signer,
eth_rpc,
):
config.add(str(default_chain_spec), 'CIC_CHAIN_SPEC', exists_ok=True)
assert health(config=config)

View File

@@ -2,11 +2,9 @@
import os
import sys
import logging
import uuid
# external imports
from eth_erc20 import ERC20
import redis
from chainlib.eth.erc20 import ERC20
# local imports
from cic_eth.api import Api
@@ -21,7 +19,6 @@ from tests.fixtures_config import *
from tests.fixtures_database import *
from tests.fixtures_celery import *
from tests.fixtures_role import *
from tests.fixtures_contract import *
from chainlib.eth.pytest import *
from eth_contract_registry.pytest import *
from cic_eth_registry.pytest.fixtures_contracts import *
@@ -58,28 +55,3 @@ def default_token(
):
BaseTask.default_token_symbol = foo_token_symbol
BaseTask.default_token_address = foo_token
@pytest.fixture(scope='session')
def have_redis(
config,
):
r = redis.Redis(
host = config.get('REDIS_HOST'),
port = config.get('REDIS_PORT'),
db = config.get('REDIS_DB'),
)
k = str(uuid.uuid4())
try:
r.set(k, 'foo')
r.delete(k)
except redis.exceptions.ConnectionError as e:
return e
except TypeError as e:
return e
return None

View File

@@ -14,9 +14,9 @@ from chainlib.eth.tx import (
Tx,
)
from chainlib.eth.block import Block
from eth_erc20 import ERC20
from chainlib.eth.erc20 import ERC20
from sarafu_faucet import MinterFaucet
from eth_accounts_index.registry import AccountRegistry
from eth_accounts_index import AccountRegistry
from potaahto.symbols import snake_and_camel
from hexathon import add_0x
@@ -26,6 +26,7 @@ from cic_eth.runnable.daemons.filters.callback import CallbackFilter
logg = logging.getLogger()
@pytest.mark.skip()
def test_transfer_tx(
default_chain_spec,
init_database,
@@ -65,6 +66,7 @@ def test_transfer_tx(
assert transfer_type == 'transfer'
@pytest.mark.skip()
def test_transfer_from_tx(
default_chain_spec,
init_database,
@@ -208,11 +210,9 @@ def test_callback_filter(
def __init__(self):
self.results = {}
self.queue = 'test'
def call_back(self, transfer_type, result):
self.results[transfer_type] = result
return self
mock = CallbackMock()
fltr.call_back = mock.call_back

View File

@@ -1,38 +0,0 @@
# local imports
from cic_eth.runnable.daemons.filters.gas import GasFilter
from cic_eth.runnable.daemons.filters.transferauth import TransferAuthFilter
from cic_eth.runnable.daemons.filters.callback import CallbackFilter
from cic_eth.runnable.daemons.filters.straggler import StragglerFilter
from cic_eth.runnable.daemons.filters.tx import TxFilter
from cic_eth.runnable.daemons.filters.register import RegistrationFilter
# Hit tx mismatch paths on all filters
def test_filter_bogus(
init_database,
bogus_tx_block,
default_chain_spec,
eth_rpc,
eth_signer,
transfer_auth,
cic_registry,
contract_roles,
register_lookups,
):
fltrs = [
TransferAuthFilter(cic_registry, default_chain_spec, eth_rpc, call_address=contract_roles['CONTRACT_DEPLOYER']),
GasFilter(default_chain_spec, queue=None),
TxFilter(default_chain_spec, None),
CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER']),
StragglerFilter(default_chain_spec, None),
RegistrationFilter(default_chain_spec, queue=None),
]
for fltr in fltrs:
r = None
try:
r = fltr.filter(eth_rpc, bogus_tx_block[0], bogus_tx_block[1], db_session=init_database)
except:
pass
assert not r

View File

@@ -1,101 +0,0 @@
# external imports
from chainlib.connection import RPCConnection
from chainlib.eth.nonce import OverrideNonceOracle
from chainqueue.sql.tx import create as queue_create
from chainlib.eth.tx import (
TxFormat,
unpack,
Tx,
)
from chainlib.eth.gas import (
Gas,
OverrideGasOracle,
)
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainqueue.sql.state import (
set_waitforgas,
)
from hexathon import strip_0x
from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusBits
# local imports
from cic_eth.runnable.daemons.filters.gas import GasFilter
from cic_eth.eth.gas import cache_gas_data
def test_filter_gas(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 42)
gas_oracle = OverrideGasOracle(price=1000000000, limit=21000)
c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
queue_create(
default_chain_spec,
42,
agent_roles['ALICE'],
tx_hash_hex,
tx_signed_raw_hex,
session=init_database,
)
cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
default_chain_spec.asdict(),
)
set_waitforgas(default_chain_spec, tx_hash_hex, session=init_database)
init_database.commit()
tx_hash_hex_wait = tx_hash_hex
otx = Otx.load(tx_hash_hex_wait, session=init_database)
assert otx.status & StatusBits.GAS_ISSUES == StatusBits.GAS_ISSUES
c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['BOB'], agent_roles['ALICE'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
queue_create(
default_chain_spec,
43,
agent_roles['BOB'],
tx_hash_hex,
tx_signed_raw_hex,
session=init_database,
)
cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
default_chain_spec.asdict(),
)
fltr = GasFilter(default_chain_spec, queue=None)
o = block_latest()
r = eth_rpc.do(o)
o = block_by_number(r, include_tx=False)
r = eth_rpc.do(o)
block = Block(r)
block.txs = [tx_hash_hex]
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx_src = unpack(tx_signed_raw_bytes, default_chain_spec)
tx = Tx(tx_src, block=block)
t = fltr.filter(eth_rpc, block, tx, db_session=init_database)
t.get_leaf()
assert t.successful()
init_database.commit()
otx = Otx.load(tx_hash_hex_wait, session=init_database)
assert otx.status & StatusBits.QUEUED == StatusBits.QUEUED

View File

@@ -1,78 +0,0 @@
# external imports
from eth_accounts_index.registry import AccountRegistry
from chainlib.connection import RPCConnection
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.tx import(
receipt,
unpack,
Tx,
)
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from erc20_faucet import Faucet
from hexathon import strip_0x
from chainqueue.sql.query import get_account_tx
# local imports
from cic_eth.runnable.daemons.filters.register import RegistrationFilter
def test_register_filter(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
account_registry,
faucet,
register_lookups,
contract_roles,
agent_roles,
cic_registry,
init_celery_tasks,
celery_session_worker,
caplog,
):
nonce_oracle = RPCNonceOracle(contract_roles['ACCOUNT_REGISTRY_WRITER'], conn=eth_rpc)
gas_oracle = OverrideGasOracle(limit=AccountRegistry.gas(), conn=eth_rpc)
c = AccountRegistry(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = c.add(account_registry, contract_roles['ACCOUNT_REGISTRY_WRITER'], agent_roles['ALICE'])
r = eth_rpc.do(o)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(o['params'][0]))
o = receipt(tx_hash_hex)
rcpt = eth_rpc.do(o)
assert rcpt['status'] == 1
o = block_latest()
r = eth_rpc.do(o)
o = block_by_number(r, include_tx=False)
r = eth_rpc.do(o)
block = Block(r)
block.txs = [tx_hash_hex]
tx_src = unpack(tx_signed_raw_bytes, default_chain_spec)
tx = Tx(tx_src, block=block, rcpt=rcpt)
tx.apply_receipt(rcpt)
fltr = RegistrationFilter(default_chain_spec, queue=None)
t = fltr.filter(eth_rpc, block, tx, db_session=init_database)
t.get_leaf()
assert t.successful()
gift_txs = get_account_tx(default_chain_spec.asdict(), agent_roles['ALICE'], as_sender=True, session=init_database)
ks = list(gift_txs.keys())
assert len(ks) == 1
tx_raw_signed_hex = strip_0x(gift_txs[ks[0]])
tx_raw_signed_bytes = bytes.fromhex(tx_raw_signed_hex)
gift_tx = unpack(tx_raw_signed_bytes, default_chain_spec)
gift = Faucet.parse_give_to_request(gift_tx['data'])
assert gift[0] == agent_roles['ALICE']

View File

@@ -17,8 +17,8 @@ from chainlib.eth.block import (
)
from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusBits
from chainqueue.sql.tx import create as queue_create
from chainqueue.sql.state import (
from chainqueue.tx import create as queue_create
from chainqueue.state import (
set_reserved,
set_ready,
set_sent,

View File

@@ -1,79 +0,0 @@
# external imports
from erc20_transfer_authorization import TransferAuthorization
from eth_erc20 import ERC20
from chainlib.connection import RPCConnection
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.tx import (
receipt,
unpack,
Tx,
)
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from hexathon import strip_0x
from chainqueue.sql.query import get_account_tx
# local imports
from cic_eth.runnable.daemons.filters.transferauth import TransferAuthFilter
def test_filter_transferauth(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
contract_roles,
transfer_auth,
foo_token,
celery_session_worker,
register_lookups,
init_custodial,
cic_registry,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
gas_oracle = OverrideGasOracle(limit=200000, conn=eth_rpc)
c = TransferAuthorization(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = c.create_request(transfer_auth, contract_roles['CONTRACT_DEPLOYER'], agent_roles['ALICE'], agent_roles['BOB'], foo_token, 1024)
r = rpc.do(o)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(o['params'][0]))
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
o = block_latest()
r = eth_rpc.do(o)
o = block_by_number(r, include_tx=False)
r = eth_rpc.do(o)
block = Block(r)
block.txs = [tx_hash_hex]
#tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx_src = unpack(tx_signed_raw_bytes, default_chain_spec)
tx = Tx(tx_src, block=block)
fltr = TransferAuthFilter(cic_registry, default_chain_spec, eth_rpc, call_address=contract_roles['CONTRACT_DEPLOYER'])
t = fltr.filter(eth_rpc, block, tx, db_session=init_database)
t.get_leaf()
assert t.successful()
approve_txs = get_account_tx(default_chain_spec.asdict(), agent_roles['ALICE'], as_sender=True, session=init_database)
ks = list(approve_txs.keys())
assert len(ks) == 1
tx_raw_signed_hex = strip_0x(approve_txs[ks[0]])
tx_raw_signed_bytes = bytes.fromhex(tx_raw_signed_hex)
approve_tx = unpack(tx_raw_signed_bytes, default_chain_spec)
c = ERC20(default_chain_spec)
approve = c.parse_approve_request(approve_tx['data'])
assert approve[0] == agent_roles['BOB']

View File

@@ -17,12 +17,13 @@ from chainlib.eth.block import (
)
from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusBits
from chainqueue.sql.tx import create as queue_create
from chainqueue.sql.state import (
from chainqueue.tx import create as queue_create
from chainqueue.state import (
set_reserved,
set_ready,
set_sent,
)
from hexathon import strip_0x
# local imports
@@ -30,7 +31,7 @@ from cic_eth.runnable.daemons.filters.tx import TxFilter
from cic_eth.eth.gas import cache_gas_data
def test_filter_tx(
def test_tx(
default_chain_spec,
init_database,
eth_rpc,
@@ -64,7 +65,6 @@ def test_filter_tx(
tx_hash_hex_orig = tx_hash_hex
gas_oracle = OverrideGasOracle(price=1100000000, limit=21000)
c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
queue_create(
default_chain_spec,

View File

@@ -22,6 +22,7 @@ def init_celery_tasks(
@pytest.fixture(scope='session')
def celery_includes():
return [
# 'cic_eth.eth.bancor',
'cic_eth.eth.erc20',
'cic_eth.eth.tx',
'cic_eth.ext.tx',
@@ -46,8 +47,8 @@ def celery_config():
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
rq = tempfile.mkdtemp()
logg.debug('celery broker session queue {} processed {}'.format(bq, bp))
logg.debug('celery backend session store {}'.format(rq))
logg.debug('celery broker queue {} processed {}'.format(bq, bp))
logg.debug('celery backend store {}'.format(rq))
yield {
'broker_url': 'filesystem://',
'broker_transport_options': {
@@ -57,11 +58,12 @@ def celery_config():
},
'result_backend': 'file://{}'.format(rq),
}
logg.debug('cleaning up celery session filesystem backend files {} {} {}'.format(bq, bp, rq))
logg.debug('cleaning up celery filesystem backend files {} {} {}'.format(bq, bp, rq))
shutil.rmtree(bq)
shutil.rmtree(bp)
shutil.rmtree(rq)
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {

Some files were not shown because too many files have changed in this diff Show More