Compare commits

..

4 Commits

Author SHA1 Message Date
nolash
13e0fd41cd Bump version 2021-02-19 07:19:01 +01:00
nolash
c9a69d8658 Fix last(?) leak in syncer
Signed-off-by: nolash <dev@holbrook.no>
2021-02-18 23:27:58 +01:00
nolash
32ce706b2d WIP fix leaks in syncer filters 2021-02-18 23:09:48 +01:00
nolash
1a31876e2e Improve reuse of db sessions 2021-02-18 21:44:49 +01:00
491 changed files with 812668 additions and 19063 deletions

View File

@@ -5,7 +5,6 @@ include:
- local: 'apps/cic-ussd/.gitlab-ci.yml'
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
stages:
- build

3
.gitmodules vendored
View File

@@ -0,0 +1,3 @@
[submodule "apps/cic-cache"]
path = apps/cic-cache
url = git@gitlab.com:grassrootseconomics/cic-cache.git

1
apps/cic-cache Submodule

Submodule apps/cic-cache added at d2cb3a4555

View File

@@ -1,2 +0,0 @@
[bancor]
dir =

View File

@@ -1,2 +0,0 @@
[cic]
registry_address =

View File

@@ -1,8 +0,0 @@
[database]
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2

View File

@@ -1,6 +0,0 @@
[eth]
provider = ws://localhost:8545
#ttp_provider = http://localhost:8545
#provider = http://localhost:8545
gas_provider_address =
#chain_id =

View File

@@ -1,2 +0,0 @@
[bancor]
dir =

View File

@@ -1,2 +0,0 @@
[cic]
registry_address =

View File

@@ -1,8 +0,0 @@
[database]
NAME=cic-cache-test
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite

View File

@@ -1,5 +0,0 @@
[eth]
#ws_provider = ws://localhost:8546
#ttp_provider = http://localhost:8545
provider = http://localhost:8545
#chain_id =

View File

@@ -1,5 +0,0 @@
[report]
omit =
.venv/*
scripts/*
cic_cache/db/postgres/*

View File

@@ -1,7 +0,0 @@
set -a
CICTEST_DATABASE_ENGINE=postgresql
CICTEST_DATABASE_DRIVER=psycopg2
CICTEST_DATABASE_HOST=localhost
CICTEST_DATABASE_PORT=5432
CICTEST_DATABASE_NAME=cic-eth-test
set +a

View File

@@ -1,8 +0,0 @@
.envrc
.envrc_dev
.venv
__pycache__
*.pyc
_build
doc/**/*.png
doc/**/html

View File

@@ -1,22 +0,0 @@
.cic_cache_variables:
variables:
APP_NAME: cic-cache
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
.cic_cache_changes_target:
rules:
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-cache:
extends:
- .cic_cache_changes_target
- .py_build_merge_request
- .cic_cache_variables
build-push-cic-cache:
extends:
- .py_build_push
- .cic_cache_variables

View File

@@ -1,13 +0,0 @@
- 0.1.2
* Revert to alembic migrations
- 0.1.1
* Add missing modules to setup
- 0.1.0
* Remove old APIs
* Add bloom filter output APIs for all txs and per-account txs
- 0.0.2
* UWSGI server endpoint example
* OpenAPI spec
* stored procedures, test fixture for database schema
- 0.0.1
* Add json translators of transaction_list and balances stored procedure queries

View File

@@ -1 +0,0 @@
from .cache import BloomCache

View File

@@ -1,73 +0,0 @@
"""API for cic-cache celery tasks
.. moduleauthor:: Louis Holbrook <dev@holbrook.no>
"""
# standard imports
import logging
# third-party imports
import celery
app = celery.current_app
logg = logging.getLogger(__name__)
class Api:
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, queue='cic-cache', callback_param=None, callback_task='cic_cache.callbacks.noop.noop', callback_queue=None):
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.info('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)
def list(self, offset, limit, address=None):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter',
[
0,
100,
address,
],
queue=None
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)
t = s.apply_async()
return t

View File

@@ -1,89 +0,0 @@
# standard imports
import logging
# third-party imports
import moolb
# local imports
from cic_cache.db import list_transactions_mined
from cic_cache.db import list_transactions_account_mined
logg = logging.getLogger()
class BloomCache:
def __init__(self, session):
self.session = session
@staticmethod
def __get_filter_size(n):
n = 8192 * 8
logg.warning('filter size hardcoded to {}'.format(n))
return n
def load_transactions(self, offset, limit):
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
For example, if the block number is 13 and the transaction index is 42, the input are:
block filter: 0x0d000000
block+tx filter: 0x0d0000002a0000000
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_mined(self.session, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
highest_block = -1
lowest_block = -1
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
def load_transactions_account(self, address, offset, limit):
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for.
:type address: str, 0x-hex
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_account_mined(self.session, address, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
highest_block = -1;
lowest_block = -1;
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)

View File

@@ -1,35 +0,0 @@
# standard imports
import logging
# local imports
from .list import list_transactions_mined
from .list import list_transactions_account_mined
from .list import add_transaction
logg = logging.getLogger()
def dsn_from_config(config):
scheme = config.get('DATABASE_ENGINE')
if config.get('DATABASE_DRIVER') != None:
scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
dsn = ''
if config.get('DATABASE_ENGINE') == 'sqlite':
dsn = '{}:///{}'.format(
scheme,
config.get('DATABASE_NAME'),
)
else:
dsn = '{}://{}:{}@{}:{}/{}'.format(
scheme,
config.get('DATABASE_USER'),
config.get('DATABASE_PASSWORD'),
config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'),
)
logg.debug('parsed dsn from config: {}'.format(dsn))
return dsn

View File

@@ -1,79 +0,0 @@
# standard imports
import logging
import datetime
# third-party imports
from cic_cache.db.models.base import SessionBase
logg = logging.getLogger()
def list_transactions_mined(
session,
offset,
limit,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
r = session.execute(s)
return r
def list_transactions_account_mined(
session,
address,
offset,
limit,
):
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for.
:type address: str, 0x-hex
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
r = session.execute(s)
return r
def add_transaction(
session, tx_hash,
block_number,
tx_index,
sender,
receiver,
source_token,
destination_token,
from_value,
to_value,
success,
timestamp,
):
date_block = datetime.datetime.fromtimestamp(timestamp)
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
tx_hash,
block_number,
tx_index,
sender,
receiver,
source_token,
destination_token,
from_value,
to_value,
success,
date_block,
)
session.execute(s)

View File

@@ -1 +0,0 @@
Generic single-database configuration.

View File

@@ -1,86 +0,0 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = .
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to ./versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat ./versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
#sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic_cache
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -1,77 +0,0 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -1,24 +0,0 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@@ -1,52 +0,0 @@
"""Base tables
Revision ID: 63b629f14a85
Revises:
Create Date: 2020-12-04 08:16:00.412189
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '63b629f14a85'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_registered', sa.DateTime, nullable=False, server_default=sa.func.current_timestamp()),
sa.Column('block_number', sa.Integer, nullable=False),
sa.Column('tx_index', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('source_token', sa.String(42), nullable=False),
sa.Column('destination_token', sa.String(42), nullable=False),
sa.Column('success', sa.Boolean, nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=False),
sa.Column('date_block', sa.DateTime, nullable=False),
)
op.create_table(
'tx_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tx', sa.String(66), nullable=False),
)
op.execute("INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');")
op.create_index('sender_token_idx', 'tx', ['sender', 'source_token'])
op.create_index('recipient_token_idx', 'tx', ['recipient', 'destination_token'])
def downgrade():
op.drop_index('recipient_token_idx')
op.drop_index('sender_token_idx')
op.drop_table('tx_sync')
op.drop_table('tx')

View File

@@ -1,102 +0,0 @@
# stanard imports
import logging
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
logg = logging.getLogger()
Model = declarative_base(name='Model')
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
__abstract__ = True
id = Column(Integer, primary_key=True)
engine = None
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
def create_session():
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def connect(dsn, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
:type dsn: str
"""
e = None
if SessionBase.poolable:
e = create_engine(
dsn,
max_overflow=50,
pool_pre_ping=True,
pool_size=20,
pool_recycle=10,
echo=debug,
)
else:
e = create_engine(
dsn,
echo=debug,
)
SessionBase._set_engine(e)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -1,141 +0,0 @@
# standard imports
import os
import re
import logging
import argparse
import json
import base64
# third-party imports
import confini
# local imports
from cic_cache import BloomCache
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations')
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config:\n{}'.format(config))
dsn = dsn_from_config(config)
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
# uwsgi application
def application(env, start_response):
headers = []
content = b''
session = SessionBase.create_session()
for handler in [
process_transactions_all_bloom,
process_transactions_account_bloom,
]:
r = handler(session, env)
if r != None:
(mime_type, content) = r
break
session.close()
headers.append(('Content-Length', str(len(content))),)
headers.append(('Access-Control-Allow-Origin', '*',));
if len(content) == 0:
headers.append(('Content-Type', 'text/plain, charset=UTF-8',))
start_response('404 Looked everywhere, sorry', headers)
else:
headers.append(('Content-Type', mime_type,))
start_response('200 OK', headers)
return [content]

View File

@@ -1,98 +0,0 @@
# standard imports
import logging
import os
import sys
import argparse
# third-party imports
import celery
import confini
# local imports
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
from cic_cache.tasks.tx import *
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-cache', help='queue name for worker tasks')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn)
# verify database connection with minimal sanity query
#session = SessionBase.create_session()
#session.execute('select version_num from alembic_version')
#session.close()
# set up celery
current_app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
current_app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
current_app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
current_app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
current_app.conf.update({
'result_backend': result,
})
def main():
argv = ['worker']
if args.vv:
argv.append('--loglevel=DEBUG')
elif args.v:
argv.append('--loglevel=INFO')
argv.append('-Q')
argv.append(args.q)
argv.append('-n')
argv.append(args.q)
current_app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@@ -1,339 +0,0 @@
# standard imports
import sys
import os
import argparse
import logging
import time
import enum
import re
# third-party imports
import confini
from cic_registry import CICRegistry
from cic_registry.chain import (
ChainRegistry,
ChainSpec,
)
#from cic_registry.bancor import BancorRegistryClient
from cic_registry.token import Token
from cic_registry.error import (
UnknownContractError,
UnknownDeclarationError,
)
from cic_registry.declaration import to_token_declaration
from web3.exceptions import BlockNotFound, TransactionNotFound
from websockets.exceptions import ConnectionClosedError
from requests.exceptions import ConnectionError
import web3
from web3 import HTTPProvider, WebsocketProvider
# local imports
from cic_cache import db
from cic_cache.db.models.base import SessionBase
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
log_topics = {
'transfer': '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
'convert': '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095',
'accountregistry_add': '0a3b0a4f4c6e53dce3dbcad5614cb2ba3a0fa7326d03c5d64b4fa2d565492737',
}
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--trust-address', default=[], type=str, dest='trust_address', action='append', help='Set address as trust')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_TRUST_ADDRESS': ",".join(getattr(args, 'trust_address', [])),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
# connect to database
dsn = db.dsn_from_config(config)
SessionBase.connect(dsn)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
class RunStateEnum(enum.IntEnum):
INIT = 0
RUN = 1
TERMINATE = 9
def rubberstamp(src):
return True
class Tracker:
def __init__(self, chain_spec, trusts=[]):
self.block_height = 0
self.tx_height = 0
self.state = RunStateEnum.INIT
self.declarator_cache = {}
self.convert_enabled = False
self.trusts = trusts
self.chain_spec = chain_spec
self.declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
def __process_tx(self, w3, session, t, r, l, b):
token_value = int(l.data, 16)
token_sender = l.topics[1][-20:].hex()
token_recipient = l.topics[2][-20:].hex()
#ts = ContractRegistry.get_address(t.address)
ts = CICRegistry.get_address(self.chain_spec, t.address())
logg.info('add token transfer {} value {} from {} to {}'.format(
ts.symbol(),
token_value,
token_sender,
token_recipient,
)
)
db.add_transaction(
session,
r.transactionHash.hex(),
r.blockNumber,
r.transactionIndex,
w3.toChecksumAddress(token_sender),
w3.toChecksumAddress(token_recipient),
t.address(),
t.address(),
token_value,
token_value,
r.status == 1,
b.timestamp,
)
session.flush()
# TODO: simplify/ split up and/or comment, function is too long
def __process_convert(self, w3, session, t, r, l, b):
logg.warning('conversions are deactivated')
return
# token_source = l.topics[2][-20:].hex()
# token_source = w3.toChecksumAddress(token_source)
# token_destination = l.topics[3][-20:].hex()
# token_destination = w3.toChecksumAddress(token_destination)
# data_noox = l.data[2:]
# d = data_noox[:64]
# token_from_value = int(d, 16)
# d = data_noox[64:128]
# token_to_value = int(d, 16)
# token_trader = '0x' + data_noox[192-40:]
#
# #ts = ContractRegistry.get_address(token_source)
# ts = CICRegistry.get_address(CICRegistry.bancor_chain_spec, t.address())
# #if ts == None:
# # ts = ContractRegistry.reserves[token_source]
# td = ContractRegistry.get_address(token_destination)
# #if td == None:
# # td = ContractRegistry.reserves[token_source]
# logg.info('add token convert {} -> {} value {} -> {} trader {}'.format(
# ts.symbol(),
# td.symbol(),
# token_from_value,
# token_to_value,
# token_trader,
# )
# )
#
# db.add_transaction(
# session,
# r.transactionHash.hex(),
# r.blockNumber,
# r.transactionIndex,
# w3.toChecksumAddress(token_trader),
# w3.toChecksumAddress(token_trader),
# token_source,
# token_destination,
# r.status == 1,
# b.timestamp,
# )
# session.flush()
def check_token(self, address):
t = None
try:
t = CICRegistry.get_address(CICRegistry.default_chain_spec, address)
return t
except UnknownContractError:
logg.debug('contract {} not in registry'.format(address))
# If nothing was returned, we look up the token in the declarator
for trust in self.trusts:
logg.debug('look up declaration for contract {} with trust {}'.format(address, trust))
fn = self.declarator.function('declaration')
# TODO: cache trust in LRUcache
declaration_array = fn(trust, address).call()
try:
declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp])
logg.debug('found declaration for token {} from trust address {}'.format(address, trust))
except UnknownDeclarationError:
continue
try:
c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address)
t = CICRegistry.add_token(self.chain_spec, c)
break
except ValueError:
logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address))
return t
# TODO use input data instead of logs
def process(self, w3, session, block):
#self.refresh_registry(w3)
tx_count = w3.eth.getBlockTransactionCount(block.hash)
b = w3.eth.getBlock(block.hash)
for i in range(self.tx_height, tx_count):
tx = w3.eth.getTransactionByBlock(block.hash, i)
if tx.to == None:
logg.debug('block {} tx {} is contract creation tx, skipping'.format(block.number, i))
continue
if len(w3.eth.getCode(tx.to)) == 0:
logg.debug('block {} tx {} not a contract tx, skipping'.format(block.number, i))
continue
t = self.check_token(tx.to)
if t != None and isinstance(t, Token):
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['transfer']:
self.__process_tx(w3, session, t, r, l, b)
# TODO: cache contracts in LRUcache
elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address:
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['convert']:
self.__process_convert(w3, session, t, r, l, b)
session.execute("UPDATE tx_sync SET tx = '{}'".format(tx.hash.hex()))
session.commit()
self.tx_height += 1
def __get_next_retry(self, backoff=False):
return 1
def loop(self):
logg.info('starting at block {} tx index {}'.format(self.block_height, self.tx_height))
self.state = RunStateEnum.RUN
while self.state == RunStateEnum.RUN:
(provider, w3) = web3_constructor()
session = SessionBase.create_session()
try:
block = w3.eth.getBlock(self.block_height)
self.process(w3, session, block)
self.block_height += 1
self.tx_height = 0
except BlockNotFound as e:
logg.debug('no block {} yet, zZzZ...'.format(self.block_height))
time.sleep(self.__get_next_retry())
except ConnectionClosedError as e:
logg.info('connection gone, retrying')
time.sleep(self.__get_next_retry(True))
except OSError as e:
logg.error('cannot connect {}'.format(e))
time.sleep(self.__get_next_retry(True))
except Exception as e:
session.close()
raise(e)
session.close()
def load(self, w3):
session = SessionBase.create_session()
r = session.execute('SELECT tx FROM tx_sync').first()
if r != None:
if r[0] == '0x{0:0{1}X}'.format(0, 64):
logg.debug('last tx was zero-address, starting from scratch')
return
t = w3.eth.getTransaction(r[0])
self.block_height = t.blockNumber
self.tx_height = t.transactionIndex+1
c = w3.eth.getBlockTransactionCount(t.blockHash.hex())
logg.debug('last tx processed {} index {} (max index {})'.format(t.blockNumber, t.transactionIndex, c-1))
if c == self.tx_height:
self.block_height += 1
self.tx_height = 0
session.close()
(provider, w3) = web3_constructor()
trust = config.get('CIC_TRUST_ADDRESS', "").split(",")
chain_spec = args.i
try:
w3.eth.chainId
except Exception as e:
logg.exception(e)
sys.stderr.write('cannot connect to evm node\n')
sys.exit(1)
def main():
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry)
t = Tracker(chain_spec, trust)
t.load(w3)
t.loop()
if __name__ == '__main__':
main()

View File

@@ -1,38 +0,0 @@
# third-party imports
import celery
# local imports
from cic_cache.cache import BloomCache
from cic_cache.db.models.base import SessionBase
celery_app = celery.current_app
@celery_app.task(bind=True)
def tx_filter(self, offset, limit, address=None, encoding='hex'):
queue = self.request.delivery_info.get('routing_key')
session = SessionBase.create_session()
c = BloomCache(session)
b = None
if address == None:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
else:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
session.close()
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': bloom_filter_block.hex(),
'blocktx_filter': bloom_filter_tx.hex(),
'filter_rounds': 3,
}
return o

View File

@@ -1,18 +0,0 @@
import os
import semver
version = (
0,
2,
0,
'alpha.1',
)
version_object = semver.VersionInfo(
major=version[0],
minor=version[1],
patch=version[2],
prerelease=version[3],
)
version_string = str(version_object)

View File

@@ -1,2 +0,0 @@
[bancor]
dir =

View File

@@ -1,3 +0,0 @@
[celery]
broker_url = redis:///
result_url = redis:///

View File

@@ -1,4 +0,0 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -1,9 +0,0 @@
[database]
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=

View File

@@ -1,3 +0,0 @@
[bancor]
registry_address =
dir = /usr/local/share/bancor

View File

@@ -1,3 +0,0 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379

View File

@@ -1,9 +0,0 @@
[database]
NAME=cic_cache
USER=grassroots
PASSWORD=
HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=1

View File

@@ -1,3 +0,0 @@
[eth]
provider = ws://localhost:63546
chain_id = 8996

View File

@@ -1,7 +0,0 @@
[eth]
provider = ws://localhost:8545
#ttp_provider = http://localhost:8545
#provider = http://localhost:8545
gas_provider_address =
#chain_id =
abi_dir = /usr/local/share/cic/solidity/abi

View File

@@ -1,2 +0,0 @@
[bancor]
dir =

View File

@@ -1,2 +0,0 @@
[cic]
registry_address =

View File

@@ -1,9 +0,0 @@
[database]
NAME=cic-cache-test
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=

View File

@@ -1,5 +0,0 @@
[eth]
#ws_provider = ws://localhost:8546
#ttp_provider = http://localhost:8545
provider = http://localhost:8545
#chain_id =

View File

@@ -1,22 +0,0 @@
CREATE TABLE tx (
id SERIAL PRIMARY KEY,
date_registered TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
block_number INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
sender VARCHAR(42) NOT NULL,
recipient VARCHAR(42) NOT NULL,
source_token VARCHAR(42) NOT NULL,
destination_token VARCHAR(42) NOT NULL,
from_value BIGINT NOT NULL,
to_value BIGINT NOT NULL,
success BOOLEAN NOT NULL,
date_block TIMESTAMP NOT NULL
);
CREATE TABLE tx_sync (
id SERIAL PRIMARY KEY,
tx VARCHAR(66) NOT NULL
);
INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');

View File

@@ -1,23 +0,0 @@
CREATE TABLE tx (
id SERIAL PRIMARY KEY,
date_registered DATETIME NOT NULL default CURRENT_DATE,
block_number INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
sender VARCHAR(42) NOT NULL,
recipient VARCHAR(42) NOT NULL,
source_token VARCHAR(42) NOT NULL,
destination_token VARCHAR(42) NOT NULL,
from_value INTEGER NOT NULL,
to_value INTEGER NOT NULL,
success BOOLEAN NOT NULL,
date_block DATETIME NOT NULL,
CHECK (success IN (0, 1))
);
CREATE TABLE tx_sync (
id SERIAL PRIMARY_KEY,
tx VARCHAR(66) NOT NULL
);
INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');

View File

@@ -1,102 +0,0 @@
openapi: "3.0.3"
info:
title: Grassroots Economics CIC Cache
description: Cache of processed transaction data from Ethereum blockchain and worker queues
termsOfService: bzz://grassrootseconomics.eth/terms
contact:
name: Grassroots Economics
url: https://www.grassrootseconomics.org
email: will@grassecon.org
license:
name: GPLv3
version: 0.1.0
paths:
/tx/{offset}/{limit}:
description: Bloom filter for batch of latest transactions
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters:
- name: offset
in: path
schema:
type: integer
format: int32
- name: limit
in: path
schema:
type: integer
format: int32
/tx/{address}/{offset}/{limit}:
description: Bloom filter for batch of latest transactions by account
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters:
- name: address
in: path
required: true
schema:
type: string
- name: offset
in: path
schema:
type: integer
format: int32
- name: limit
in: path
schema:
type: integer
format: int32
components:
schemas:
BlocksBloom:
type: object
properties:
low:
type: int
format: int32
description: The lowest block number included in the filter
block_filter:
type: string
format: byte
description: Block number filter
blocktx_filter:
type: string
format: byte
description: Block and tx index filter
alg:
type: string
description: Hashing algorithm (currently only using sha256)
filter_rounds:
type: int
format: int32
description: Number of hash rounds used to create the filter

View File

@@ -1,53 +0,0 @@
FROM python:3.8.6-slim-buster
#COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
WORKDIR /usr/src/cic-cache
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
ARG root_requirement_file='requirements.txt'
#RUN apk update && \
# apk add gcc musl-dev gnupg libpq
#RUN apk add postgresql-dev
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a44
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
cic-cache/setup.py \
./
COPY cic-cache/cic_cache/ ./cic_cache/
COPY cic-cache/scripts/ ./scripts/
COPY cic-cache/test_requirements.txt ./
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
RUN pip install $pip_extra_index_url_flag .
RUN pip install .[server]
COPY cic-cache/tests/ ./tests/
#COPY db/ cic-cache/db
#RUN apk add postgresql-client
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY cic-cache/config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]

File diff suppressed because it is too large Load Diff

View File

@@ -1,40 +0,0 @@
let xmlhttprequest = require('xhr2');
let moolb = require('moolb');
let xhr = new xmlhttprequest();
xhr.responseType = 'json';
xhr.open('GET', 'http://localhost:5555/tx/0/100');
xhr.addEventListener('load', (e) => {
d = xhr.response;
b_one = Buffer.from(d.block_filter, 'base64');
b_two = Buffer.from(d.blocktx_filter, 'base64');
for (let i = 0; i < 8192; i++) {
if (b_two[i] > 0) {
console.debug('value on', i, b_two[i]);
}
}
console.log(b_one, b_two);
let f_block = moolb.fromBytes(b_one, d.filter_rounds);
let f_blocktx = moolb.fromBytes(b_two, d.filter_rounds);
let a = new ArrayBuffer(8);
let w = new DataView(a);
for (let i = 410000; i < 430000; i++) {
w.setInt32(0, i);
let r = new Uint8Array(a.slice(0, 4));
if (f_block.check(r)) {
for (let j = 0; j < 200; j++) {
w = new DataView(a);
w.setInt32(4, j);
r = new Uint8Array(a);
if (f_blocktx.check(r)) {
console.log('true', i, j);
}
}
}
}
});
let r = xhr.send();

View File

@@ -1,10 +0,0 @@
alembic==1.4.2
confini~=0.3.6b2
uwsgi==2.0.19.1
moolb~=0.1.0
cic-registry~=0.5.3a4
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3

View File

@@ -1,56 +0,0 @@
#!/usr/bin/python
import os
import argparse
import logging
import alembic
from alembic.config import Config as AlembicConfig
import confini
from cic_cache.db import dsn_from_config
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
# BUG: the dbdir doesn't work after script install
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations')
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config:\n{}'.format(config))
migrations_dir = os.path.join(args.migrations_dir, config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrations_dir):
logg.debug('migrations dir for engine {} not found, reverting to default'.format(config.get('DATABASE_ENGINE')))
migrations_dir = os.path.join(args.migrations_dir, 'default')
# connect to database
dsn = dsn_from_config(config)
logg.info('using migrations dir {}'.format(migrations_dir))
logg.info('using db {}'.format(dsn))
ac = AlembicConfig(os.path.join(migrations_dir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', dsn)
ac.set_main_option('script_location', migrations_dir)
alembic.command.upgrade(ac, 'head')

View File

@@ -1,39 +0,0 @@
[metadata]
name = cic-cache
description = CIC Cache API and server
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/grassrootseconomics/cic-eth
keywords =
cic
cryptocurrency
ethereum
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Environment :: No Input/Output (Daemon)
Intended Audience :: Developers
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
# Topic :: Blockchain :: EVM
license = GPL3
licence_files =
LICENSE.txt
[options]
python_requires = >= 3.6
packages =
cic_cache
cic_cache.tasks
cic_cache.db
cic_cache.db.models
cic_cache.runnable
scripts =
./scripts/migrate.py
[options.entry_points]
console_scripts =
cic-cache-trackerd = cic_cache.runnable.tracker:main
cic-cache-serverd = cic_cache.runnable.server:main
cic-cache-taskerd = cic_cache.runnable.tasker:main

View File

@@ -1,60 +0,0 @@
from setuptools import setup
import configparser
import os
import time
from cic_cache.version import (
version_object,
version_string
)
class PleaseCommitFirstError(Exception):
pass
def git_hash():
import subprocess
git_diff = subprocess.run(['git', 'diff'], capture_output=True)
if len(git_diff.stdout) > 0:
raise PleaseCommitFirstError()
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
return git_hash_brief
version_string = str(version_object)
try:
version_git = git_hash()
version_string += '+build.{}'.format(version_git)
except FileNotFoundError:
time_string_pair = str(time.time()).split('.')
version_string += '+build.{}{:<09d}'.format(
time_string_pair[0],
int(time_string_pair[1]),
)
print('final version string will be {}'.format(version_string))
requirements = []
f = open('requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
requirements.append(l.rstrip())
f.close()
test_requirements = []
f = open('test_requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
test_requirements.append(l.rstrip())
f.close()
setup(
version=version_string,
install_requires=requirements,
tests_require=test_requirements,
)

View File

@@ -1,6 +0,0 @@
pytest==6.0.1
pytest-cov==2.10.1
pytest-mock==3.3.1
pysqlite3==0.4.3
sqlparse==0.4.1
pytest-celery==0.0.0a1

View File

@@ -1,86 +0,0 @@
# standard imports
import os
import sys
import datetime
# third-party imports
import pytest
# local imports
from cic_cache import db
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
sys.path.insert(0, root_dir)
# fixtures
from tests.fixtures_config import *
from tests.fixtures_database import *
from tests.fixtures_celery import *
@pytest.fixture(scope='session')
def balances_dict_fields():
return {
'out_pending': 0,
'out_synced': 1,
'out_confirmed': 2,
'in_pending': 3,
'in_synced': 4,
'in_confirmed': 5,
}
@pytest.fixture(scope='function')
def txs(
init_database,
list_defaults,
list_actors,
list_tokens,
):
session = init_database
tx_number = 13
tx_hash_first = '0x' + os.urandom(32).hex()
val = 15000
nonce = 1
dt = datetime.datetime.utcnow()
db.add_transaction(
session,
tx_hash_first,
list_defaults['block'],
tx_number,
list_actors['alice'],
list_actors['bob'],
list_tokens['foo'],
list_tokens['foo'],
1024,
2048,
True,
dt.timestamp(),
)
tx_number = 42
tx_hash_second = '0x' + os.urandom(32).hex()
tx_signed_second = '0x' + os.urandom(128).hex()
nonce = 1
dt -= datetime.timedelta(hours=1)
db.add_transaction(
session,
tx_hash_second,
list_defaults['block']-1,
tx_number,
list_actors['diane'],
list_actors['alice'],
list_tokens['foo'],
list_tokens['foo'],
1024,
2048,
False,
dt.timestamp(),
)
session.commit()

View File

@@ -1,48 +0,0 @@
# third-party imports
import pytest
import tempfile
import logging
import shutil
logg = logging.getLogger(__name__)
# celery fixtures
@pytest.fixture(scope='session')
def celery_includes():
return [
'cic_cache.tasks.tx',
]
@pytest.fixture(scope='session')
def celery_config():
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
rq = tempfile.mkdtemp()
logg.debug('celery broker queue {} processed {}'.format(bq, bp))
logg.debug('celery backend store {}'.format(rq))
yield {
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
'result_backend': 'file://{}'.format(rq),
}
logg.debug('cleaning up celery filesystem backend files {} {} {}'.format(bq, bp, rq))
shutil.rmtree(bq)
shutil.rmtree(bp)
shutil.rmtree(rq)
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
# 'queues': ('cic-cache'),
}
@pytest.fixture(scope='session')
def celery_enable_logging():
return True

View File

@@ -1,20 +0,0 @@
# standard imports
import os
import logging
# third-party imports
import pytest
import confini
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, '.config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))
return conf

View File

@@ -1,118 +0,0 @@
# standard imports
import os
import logging
import re
# third-party imports
import pytest
import sqlparse
# local imports
from cic_cache.db.models.base import SessionBase
from cic_cache.db import dsn_from_config
logg = logging.getLogger(__file__)
@pytest.fixture(scope='function')
def database_engine(
load_config,
):
if load_config.get('DATABASE_ENGINE') == 'sqlite':
SessionBase.transactional = False
SessionBase.poolable = False
try:
os.unlink(load_config.get('DATABASE_NAME'))
except FileNotFoundError:
pass
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn)
return dsn
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
@pytest.fixture(scope='function')
def init_database(
load_config,
database_engine,
):
rootdir = os.path.dirname(os.path.dirname(__file__))
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
if load_config.get('DATABASE_ENGINE') == 'sqlite':
rconn = SessionBase.engine.raw_connection()
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
rconn.executescript(s)
else:
rconn = SessionBase.engine.raw_connection()
rcursor = rconn.cursor()
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
r = re.compile(r'^[A-Z]', re.MULTILINE)
for l in sqlparse.parse(s):
strl = str(l)
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
if not re.search(r, strl):
logg.warning('skipping parsed query line {}'.format(strl))
continue
rcursor.execute(strl)
rconn.commit()
rcursor.execute('SET search_path TO public')
# this doesn't work when run separately, no idea why
# functions have been manually added to original schema from cic-eth
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
#
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
rcursor.close()
session = SessionBase.create_session()
yield session
session.commit()
session.close()
@pytest.fixture(scope='function')
def list_tokens(
):
return {
'foo': '0x' + os.urandom(20).hex(),
'bar': '0x' + os.urandom(20).hex(),
}
@pytest.fixture(scope='function')
def list_actors(
):
return {
'alice': '0x' + os.urandom(20).hex(),
'bob': '0x' + os.urandom(20).hex(),
'charlie': '0x' + os.urandom(20).hex(),
'diane': '0x' + os.urandom(20).hex(),
}
@pytest.fixture(scope='function')
def list_defaults(
):
return {
'block': 420000,
}

View File

@@ -1,35 +0,0 @@
# standard imports
import os
import datetime
import logging
import json
# third-party imports
import pytest
# local imports
from cic_cache import BloomCache
logg = logging.getLogger()
def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
session = init_database
c = BloomCache(session)
b = c.load_transactions(0, 100)
assert b[0] == list_defaults['block'] - 1
c = BloomCache(session)
c.load_transactions_account(list_actors['alice'],0, 100)
assert b[0] == list_defaults['block'] - 1

View File

@@ -1,27 +0,0 @@
# standard imports
import logging
# third-party imports
import celery
# local imports
from cic_cache.api import Api
logg = logging.getLogger()
def test_task(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
celery_session_worker,
):
api = Api(queue=None)
t = api.list(0, 100)
r = t.get()
logg.debug('r {}'.format(r))
assert r['low'] == list_defaults['block'] - 1

View File

@@ -4,23 +4,18 @@ import logging
# third-party imports
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from cic_registry import zero_address
# local imports
from cic_eth.db.enum import LockEnum
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.lock import Lock
from cic_eth.task import (
CriticalSQLAlchemyTask,
)
from cic_eth.error import LockedError
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL, tx_hash=None):
@celery_app.task()
def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
:param chain_str: Chain spec string representation
@@ -32,14 +27,13 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.AL
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash)
logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL):
@celery_app.task()
def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks
:param chain_str: Chain spec string representation
@@ -51,14 +45,13 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, flags, address=address)
logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
@celery_app.task()
def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
"""Task wrapper to set send lock
:param chain_str: Chain spec string representation
@@ -68,14 +61,13 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash)
logg.debug('Send locked for {}, flag now {}'.format(address, r))
return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
@celery_app.task()
def unlock_send(chained_input, chain_str, address=zero_address):
"""Task wrapper to reset send lock
:param chain_str: Chain spec string representation
@@ -85,14 +77,13 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.SEND, address=address)
logg.debug('Send unlocked for {}, flag now {}'.format(address, r))
return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
@celery_app.task()
def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
"""Task wrapper to set queue direct lock
:param chain_str: Chain spec string representation
@@ -102,14 +93,13 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=Non
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash)
logg.debug('Queue direct locked for {}, flag now {}'.format(address, r))
return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
@celery_app.task()
def unlock_queue(chained_input, chain_str, address=zero_address):
"""Task wrapper to reset queue direct lock
:param chain_str: Chain spec string representation
@@ -119,23 +109,17 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.QUEUE, address=address)
logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r))
return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS, session=session)
@celery_app.task()
def check_lock(chained_input, chain_str, lock_flags, address=None):
r = Lock.check(chain_str, lock_flags, address=zero_address)
if address != None:
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
r |= Lock.check(chain_str, lock_flags, address=address)
if r > 0:
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
session.close()
raise LockedError(r)
session.flush()
session.close()
return chained_input

View File

@@ -1,25 +1,12 @@
# standard imports
import datetime
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
from cic_eth.db.models.base import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask
celery_app = celery.current_app
@celery_app.task(base=CriticalSQLAlchemyTask)
def alert(chained_input, tag, txt):
session = SessionBase.create_session()
o = Debug(tag, txt)
session.add(o)
session.commit()
session.close()
return chained_input
@celery_app.task()
def out_tmp(tag, txt):
f = open('/tmp/err.{}.txt'.format(tag), "w")
f.write(txt)
f.close()

View File

@@ -1,30 +1,25 @@
# standard imports
import logging
# external imports
# third-party imports
import celery
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.admin.ctrl import (
lock_send,
unlock_send,
lock_queue,
unlock_queue,
)
from cic_eth.queue.tx import (
get_tx,
set_cancel,
)
from cic_eth.admin.ctrl import lock_send
from cic_eth.admin.ctrl import unlock_send
from cic_eth.admin.ctrl import lock_queue
from cic_eth.admin.ctrl import unlock_queue
from cic_eth.queue.tx import get_tx
from cic_eth.queue.tx import set_cancel
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.gas import (
create_check_gas_task,
)
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import sign_tx
from cic_eth.eth.task import create_check_gas_and_send_task
celery_app = celery.current_app
logg = logging.getLogger()
@@ -51,7 +46,7 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_brief = get_tx(tx_hash_orig_hex)
tx_raw = bytes.fromhex(tx_brief['signed_tx'][2:])
tx = unpack(tx_raw, chain_spec.chain_id())
tx = unpack_signed_raw_tx(tx_raw, chain_spec.chain_id())
nonce = tx_brief['nonce']
address = tx['from']
@@ -72,7 +67,7 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
txs = []
for otx in otxs:
tx_raw = bytes.fromhex(otx.signed_tx[2:])
tx_new = unpack(tx_raw, chain_spec.chain_id())
tx_new = unpack_signed_raw_tx(tx_raw, chain_spec.chain_id())
tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce']

View File

@@ -1,27 +1,14 @@
# standard imports
import logging
import sys
# external imports
# third-party imports
import celery
from chainlib.eth.constant import (
ZERO_ADDRESS,
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.eth.address import to_checksum_address
from chainlib.eth.contract import code
from chainlib.eth.tx import (
transaction,
receipt,
unpack,
)
from chainlib.hash import keccak256_hex_to_hex
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.gas import balance
import web3
from cic_registry import zero_address
from cic_registry import zero_content
from cic_registry import CICRegistry
from crypto_dev_signer.eth.web3ext import Web3 as Web3Ext
from cic_registry.error import UnknownContractError
# local imports
from cic_eth.db.models.base import SessionBase
@@ -31,22 +18,19 @@ from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
is_alive,
is_error_status,
status_str,
)
from cic_eth.error import InitializationError
from cic_eth.db.error import TxStateChangeError
from cic_eth.eth.rpc import RpcClient
from cic_eth.queue.tx import get_tx
from cic_eth.eth.util import unpack_signed_raw_tx
app = celery.current_app
#logg = logging.getLogger(__file__)
logg = logging.getLogger()
local_fail = StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.UNKNOWN_ERROR
class AdminApi:
"""Provides an interface to view and manipulate existing transaction tasks and system runtime settings.
@@ -56,20 +40,19 @@ class AdminApi:
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, rpc, queue='cic-eth', call_address=ZERO_ADDRESS):
self.rpc = rpc
def __init__(self, rpc_client, queue='cic-eth'):
self.rpc_client = rpc_client
self.w3 = rpc_client.w3
self.queue = queue
self.call_address = call_address
def unlock(self, chain_spec, address, flags=None):
s_unlock = celery.signature(
'cic_eth.admin.ctrl.unlock',
[
None,
chain_spec.asdict(),
address,
str(chain_spec),
flags,
address,
],
queue=self.queue,
)
@@ -80,10 +63,9 @@ class AdminApi:
s_lock = celery.signature(
'cic_eth.admin.ctrl.lock',
[
None,
chain_spec.asdict(),
address,
str(chain_spec),
flags,
address,
],
queue=self.queue,
)
@@ -96,10 +78,10 @@ class AdminApi:
[],
queue=self.queue,
)
return s_lock.apply_async()
return s_lock.apply_async().get()
def tag_account(self, tag, address_hex, chain_spec):
def tag_account(self, tag, address_hex):
"""Persistently associate an address with a plaintext tag.
Some tags are known by the system and is used to resolve addresses to use for certain transactions.
@@ -110,28 +92,26 @@ class AdminApi:
:type address_hex: str, 0x-hex
:raises ValueError: Invalid checksum address
"""
s_tag = celery.signature(
'cic_eth.eth.account.set_role',
[
tag,
address_hex,
chain_spec.asdict(),
],
queue=self.queue,
)
return s_tag.apply_async()
if not web3.Web3.isChecksumAddress(address_hex):
raise ValueError('invalid address')
session = SessionBase.create_session()
role = AccountRole.set(tag, address_hex)
session.add(role)
session.commit()
session.close()
def have_account(self, address_hex, chain_spec):
def have_account(self, address_hex, chain_str):
s_have = celery.signature(
'cic_eth.eth.account.have',
[
address_hex,
chain_spec.asdict(),
chain_str,
],
queue=self.queue,
)
return s_have.apply_async()
t = s_have.apply_async()
return t.get()
def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False):
@@ -200,7 +180,6 @@ class AdminApi:
blocking_tx = None
blocking_nonce = None
nonce_otx = 0
last_nonce = -1
for k in txs.keys():
s_get_tx = celery.signature(
'cic_eth.queue.tx.get_tx',
@@ -211,25 +190,18 @@ class AdminApi:
)
tx = s_get_tx.apply_async().get()
#tx = get_tx(k)
logg.debug('checking nonce {} (previous {})'.format(tx['nonce'], last_nonce))
logg.debug('checking nonce {}'.format(tx['nonce']))
if tx['status'] in [StatusEnum.REJECTED, StatusEnum.FUBAR]:
blocking_tx = k
blocking_nonce = tx['nonce']
nonce_otx = tx['nonce']
if not is_alive(tx['status']) and tx['status'] & local_fail > 0:
logg.info('permanently errored {} nonce {} status {}'.format(k, nonce_otx, status_str(tx['status'])))
blocking_tx = k
blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1:
logg.error('nonce gap; {} followed {}'.format(nonce_otx, last_nonce))
blocking_tx = k
blocking_nonce = nonce_otx
break
last_nonce = nonce_otx
#nonce_cache = Nonce.get(address)
#nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
return {
'nonce': {
#'network': nonce_cache,
'network': nonce_w3,
'queue': nonce_otx,
#'cache': nonce_cache,
'blocking': blocking_nonce,
@@ -240,7 +212,7 @@ class AdminApi:
}
def fix_nonce(self, address, nonce, chain_spec):
def fix_nonce(self, address, nonce):
s = celery.signature(
'cic_eth.queue.tx.get_account_tx',
[
@@ -261,7 +233,7 @@ class AdminApi:
s_nonce = celery.signature(
'cic_eth.admin.nonce.shift_nonce',
[
self.rpc.chain_spec.asdict(),
str(self.rpc_client.chain_spec),
tx_hash_hex,
],
queue=self.queue
@@ -269,29 +241,30 @@ class AdminApi:
return s_nonce.apply_async()
# # TODO: this is a stub, complete all checks
# def ready(self):
# """Checks whether all required initializations have been performed.
#
# :raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met.
# :raises KeyError: An address provided for initialization is not known by the keystore.
# """
# addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
# if addr == ZERO_ADDRESS:
# raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS')
#
# self.w3.eth.sign(addr, text='666f6f')
# TODO: this is a stub, complete all checks
def ready(self):
"""Checks whether all required initializations have been performed.
:raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met.
:raises KeyError: An address provided for initialization is not known by the keystore.
"""
addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
if addr == zero_address:
raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS')
self.w3.eth.sign(addr, text='666f6f')
def account(self, chain_spec, address, include_sender=True, include_recipient=True, renderer=None, w=sys.stdout):
def account(self, chain_spec, address, cols=['tx_hash', 'sender', 'recipient', 'nonce', 'block', 'tx_index', 'status', 'network_status', 'date_created'], include_sender=True, include_recipient=True):
"""Lists locally originated transactions for the given Ethereum address.
Performs a synchronous call to the Celery task responsible for performing the query.
:param address: Ethereum address to return transactions for
:type address: str, 0x-hex
:param cols: Data columns to include
:type cols: list of str
"""
last_nonce = -1
s = celery.signature(
'cic_eth.queue.tx.get_account_tx',
[
@@ -303,45 +276,33 @@ class AdminApi:
tx_dict_list = []
for tx_hash in txs.keys():
errors = []
s = celery.signature(
'cic_eth.queue.tx.get_tx_cache',
[tx_hash],
queue=self.queue,
)
tx_dict = s.apply_async().get()
if tx_dict['sender'] == address:
if tx_dict['nonce'] - last_nonce > 1:
logg.error('nonce gap; {} followed {} for tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['hash']))
errors.append('nonce')
elif tx_dict['nonce'] == last_nonce:
logg.warning('nonce {} duplicate in tx {}'.format(tx_dict['nonce'], tx_dict['hash']))
last_nonce = tx_dict['nonce']
if not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
continue
if tx_dict['sender'] == address and not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
continue
elif tx_dict['recipient'] == address and not include_recipient:
logg.debug('skipping recipient tx {}'.format(tx_dict['tx_hash']))
continue
logg.debug(tx_dict)
o = {
'nonce': tx_dict['nonce'],
'tx_hash': tx_dict['tx_hash'],
'status': tx_dict['status'],
'date_updated': tx_dict['date_updated'],
'errors': errors,
}
if renderer != None:
r = renderer(o)
w.write(r + '\n')
else:
tx_dict_list.append(o)
tx_dict_list.append(o)
return tx_dict_list
# TODO: Add exception upon non-existent tx aswell as invalid tx data to docstring
def tx(self, chain_spec, tx_hash=None, tx_raw=None, registry=None, renderer=None, w=sys.stdout):
def tx(self, chain_spec, tx_hash=None, tx_raw=None):
"""Output local and network details about a given transaction with local origin.
If the transaction hash is given, the raw trasnaction data will be retrieved from the local transaction queue backend. Otherwise the raw transaction data must be provided directly. Only one of transaction hash and transaction data can be passed.
@@ -356,14 +317,11 @@ class AdminApi:
:return: Transaction details
:rtype: dict
"""
problems = []
if tx_hash != None and tx_raw != None:
ValueError('Specify only one of hash or raw tx')
if tx_raw != None:
tx_hash = add_0x(keccak256_hex_to_hex(tx_raw))
#tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
s = celery.signature(
'cic_eth.queue.tx.get_tx_cache',
@@ -374,35 +332,31 @@ class AdminApi:
tx = s.apply_async().get()
source_token = None
if tx['source_token'] != ZERO_ADDRESS:
if tx['source_token'] != zero_address:
try:
source_token = registry.by_address(tx['source_token'])
#source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
except UnknownContractError:
#source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#source_token = CICRegistry.add_token(chain_spec, source_token_contract)
source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
source_token = CICRegistry.add_token(chain_spec, source_token_contract)
logg.warning('unknown source token contract {}'.format(tx['source_token']))
destination_token = None
if tx['source_token'] != ZERO_ADDRESS:
if tx['source_token'] != zero_address:
try:
#destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
destination_token = registry.by_address(tx['destination_token'])
destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
except UnknownContractError:
#destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
tx['sender_description'] = 'Custodial account'
tx['recipient_description'] = 'Custodial account'
o = code(tx['sender'])
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
c = RpcClient(chain_spec)
if len(c.w3.eth.getCode(tx['sender'])) > 0:
try:
#sender_contract = CICRegistry.get_address(chain_spec, tx['sender'])
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract at {}'.format(tx['sender']) #sender_contract)
sender_contract = CICRegistry.get_address(chain_spec, tx['sender'])
tx['sender_description'] = 'Contract {}'.format(sender_contract.identifier())
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
@@ -412,7 +366,7 @@ class AdminApi:
'cic_eth.eth.account.have',
[
tx['sender'],
chain_spec.asdict(),
str(chain_spec),
],
queue=self.queue,
)
@@ -425,7 +379,7 @@ class AdminApi:
'cic_eth.eth.account.role',
[
tx['sender'],
chain_spec.asdict(),
str(chain_spec),
],
queue=self.queue,
)
@@ -434,13 +388,11 @@ class AdminApi:
if role != None:
tx['sender_description'] = role
o = code(tx['recipient'])
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
if len(c.w3.eth.getCode(tx['recipient'])) > 0:
try:
#recipient_contract = CICRegistry.by_address(tx['recipient'])
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) #recipient_contract)
recipient_contract = CICRegistry.get_address(chain_spec, tx['recipient'])
tx['recipient_description'] = 'Contract {}'.format(recipient_contract.identifier())
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:
@@ -450,7 +402,7 @@ class AdminApi:
'cic_eth.eth.account.have',
[
tx['recipient'],
chain_spec.asdict(),
str(chain_spec),
],
queue=self.queue,
)
@@ -463,7 +415,7 @@ class AdminApi:
'cic_eth.eth.account.role',
[
tx['recipient'],
chain_spec.asdict(),
str(chain_spec),
],
queue=self.queue,
)
@@ -482,43 +434,29 @@ class AdminApi:
tx['network_status'] = 'Not submitted'
r = None
try:
o = transaction(tx_hash)
r = self.rpc.do(o)
except Exception as e:
logg.warning('(too permissive exception handler, please fix!) {}'.format(e))
c.w3.eth.getTransaction(tx_hash)
tx['network_status'] = 'Mempool'
except web3.exceptions.TransactionNotFound:
pass
if r != None:
try:
o = receipt(tx_hash)
r = self.rpc.do(o)
logg.debug('h {} o {}'.format(tx_hash, o))
if int(strip_0x(r['status'])) == 1:
tx['network_status'] = 'Confirmed'
else:
tx['network_status'] = 'Reverted'
tx['network_block_number'] = r.blockNumber
tx['network_tx_index'] = r.transactionIndex
if tx['block_number'] == None:
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
except Exception as e:
logg.warning('too permissive exception handler, please fix!')
pass
try:
r = c.w3.eth.getTransactionReceipt(tx_hash)
if r.status == 1:
tx['network_status'] = 'Confirmed'
tx['block'] = r.blockNumber
tx['tx_index'] = r.transactionIndex
else:
tx['network_status'] = 'Reverted'
except web3.exceptions.TransactionNotFound:
pass
o = balance(tx['sender'])
r = self.rpc.do(o)
tx['sender_gas_balance'] = r
tx['sender_gas_balance'] = c.w3.eth.getBalance(tx['sender'])
tx['recipient_gas_balance'] = c.w3.eth.getBalance(tx['recipient'])
o = balance(tx['recipient'])
r = self.rpc.do(o)
tx['recipient_gas_balance'] = r
tx_unpacked = unpack(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
tx_unpacked = unpack_signed_raw_tx(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
tx['gas_price'] = tx_unpacked['gasPrice']
tx['gas_limit'] = tx_unpacked['gas']
tx['data'] = tx_unpacked['data']
s = celery.signature(
'cic_eth.queue.tx.get_state_log',
@@ -530,14 +468,4 @@ class AdminApi:
t = s.apply_async()
tx['status_log'] = t.get()
if len(problems) > 0:
sys.stderr.write('\n')
for p in problems:
sys.stderr.write('!!!{}\n'.format(p))
if renderer == None:
return tx
r = renderer(tx)
w.write(r + '\n')
return None
return tx

View File

@@ -6,12 +6,11 @@
# standard imports
import logging
# external imports
# third-party imports
import celery
from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
# local imports
from cic_registry.chain import ChainSpec
from cic_registry import CICRegistry
from cic_eth.eth.factory import TxFactory
from cic_eth.db.enum import LockEnum
app = celery.current_app
@@ -80,24 +79,18 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
@@ -110,12 +103,11 @@ class Api:
target_return,
minimum_return,
to_address,
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
s_check.link(s_tokens)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -142,26 +134,20 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
@@ -172,12 +158,11 @@ class Api:
target_return,
minimum_return,
from_address,
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
s_check.link(s_tokens)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -206,38 +191,30 @@ class Api:
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
self.chain_spec.asdict(),
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
from_address,
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
s_transfer = celery.signature(
'cic_eth.eth.erc20.transfer',
'cic_eth.eth.token.transfer',
[
from_address,
to_address,
value,
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
s_check.link(s_tokens)
if self.callback_param != None:
s_transfer.link(self.callback_success)
s_tokens.link(s_transfer).on_error(self.callback_error)
@@ -248,6 +225,82 @@ class Api:
return t
def transfer_request(self, from_address, to_address, spender_address, value, token_symbol):
"""Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:type to_address: str, 0x-hex
:param spender_address: Ethereum address that is executing transfer (typically an escrow contract)
:type spender_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_tokens_transfer_approval = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_tokens_approve = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
from_address,
spender_address,
value,
self.chain_str,
],
queue=self.queue,
)
s_transfer_approval = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
from_address,
to_address,
value,
self.chain_str,
],
queue=self.queue,
)
# TODO: make approve and transfer_approval chainable so callback can be part of the full chain
if self.callback_param != None:
s_transfer_approval.link(self.callback_success)
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error)
else:
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval)
g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue)
s_check.link(g)
t = s_check.apply_async()
#t = s_tokens.apply_async(queue=self.queue)
return t
def balance(self, address, token_symbol, include_pending=True):
"""Calls the provided callback with the current token balance of the given address.
@@ -264,18 +317,18 @@ class Api:
logg.warning('balance pointlessly called with no callback url')
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
[token_symbol],
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
s_balance = celery.signature(
'cic_eth.eth.erc20.balance',
'cic_eth.eth.token.balance',
[
address,
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
@@ -291,7 +344,7 @@ class Api:
'cic_eth.queue.balance.balance_incoming',
[
address,
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
@@ -299,7 +352,7 @@ class Api:
'cic_eth.queue.balance.balance_outgoing',
[
address,
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
@@ -307,22 +360,16 @@ class Api:
s_balance_incoming.link(s_balance_outgoing)
last_in_chain = s_balance_outgoing
one = celery.chain(s_tokens, s_balance)
two = celery.chain(s_tokens, s_balance_incoming)
three = celery.chain(s_tokens, s_balance_outgoing)
one = celery.chain(s_tokens, s_balance)
two = celery.chain(s_tokens, s_balance_incoming)
three = celery.chain(s_tokens, s_balance_outgoing)
t = None
if self.callback_param != None:
s_result.link(self.callback_success).on_error(self.callback_error)
t = celery.chord([one, two, three])(s_result)
else:
t = celery.chord([one, two, three])(s_result)
t = None
if self.callback_param != None:
s_result.link(self.callback_success).on_error(self.callback_error)
t = celery.chord([one, two, three])(s_result)
else:
# TODO: Chord is inefficient with only one chain, but assemble_balances must be able to handle different structures in order to avoid chord
one = celery.chain(s_tokens, s_balance)
if self.callback_param != None:
s_result.link(self.callback_success).on_error(self.callback_error)
t = celery.chord([one])(s_result)
t = celery.chord([one, two, three])(s_result)
return t
@@ -341,7 +388,7 @@ class Api:
'cic_eth.admin.ctrl.check_lock',
[
password,
self.chain_spec.asdict(),
self.chain_str,
LockEnum.CREATE,
],
queue=self.queue,
@@ -349,7 +396,7 @@ class Api:
s_account = celery.signature(
'cic_eth.eth.account.create',
[
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
@@ -358,22 +405,14 @@ class Api:
s_account.link(self.callback_success)
if register:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'ACCOUNT_REGISTRY_WRITER',
],
queue=self.queue,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
s_nonce.link(s_register)
s_account.link(s_nonce)
s_account.link(s_register)
t = s_check.apply_async(queue=self.queue)
return t
@@ -391,27 +430,19 @@ class Api:
'cic_eth.admin.ctrl.check_lock',
[
address,
self.chain_spec.asdict(),
self.chain_str,
LockEnum.QUEUE,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'GAS_GIFTER',
],
queue=self.queue,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
s_nonce.link(s_refill)
s_check.link(s_nonce)
s_check.link(s_refill)
if self.callback_param != None:
s_refill.link(self.callback_success)
@@ -449,15 +480,14 @@ class Api:
s_brief = celery.signature(
'cic_eth.ext.tx.tx_collate',
[
self.chain_spec.asdict(),
self.chain_str,
offset,
limit
],
queue=self.queue,
)
s_local.link(s_brief)
if self.callback_param != None:
s_brief.link(self.callback_success).on_error(self.callback_error)
s_assemble.link(self.callback_success).on_error(self.callback_error)
t = None
if external_task != None:
@@ -475,17 +505,18 @@ class Api:
'cic_eth.ext.tx.list_tx_by_bloom',
[
address,
self.chain_spec.asdict(),
self.chain_str,
],
queue=self.queue,
)
c = celery.chain(s_external_get, s_external_process)
t = celery.chord([s_local, c])(s_brief)
else:
t = s_local.apply_async(queue=self.queue)
t = s_local.apply_sync()
return t
def ping(self, r):
"""A noop callback ping for testing purposes.

View File

@@ -18,11 +18,7 @@ logg = celery_app.log.get_default_logger()
def redis(self, result, destination, status_code):
(host, port, db, channel) = destination.split(':')
r = redis_interface.Redis(host=host, port=port, db=db)
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
s = json.dumps(result)
logg.debug('redis callback on host {} port {} db {} channel {}'.format(host, port, db, channel))
r.publish(channel, json.dumps(data))
r.publish(channel, s)
r.close()

View File

@@ -20,10 +20,5 @@ def tcp(self, result, destination, status_code):
(host, port) = destination.split(':')
logg.debug('tcp callback to {} {}'.format(host, port))
s.connect((host, int(port)))
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
s.send(json.dumps(data).encode('utf-8'))
s.send(json.dumps(result).encode('utf-8'))
s.close()

View File

@@ -103,9 +103,6 @@ def status_str(v, bits_only=False):
except ValueError:
pass
if v == 0:
return 'NONE'
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:

View File

@@ -1,31 +0,0 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -1,28 +0,0 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -1,32 +0,0 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -1,31 +0,0 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -24,7 +24,6 @@ def upgrade():
sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False),
sa.Column("otx_id", sa.Integer, nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)

View File

@@ -1,28 +0,0 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -1,32 +0,0 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -54,7 +54,7 @@ class SessionBase(Model):
@staticmethod
def connect(dsn, pool_size=16, debug=False):
def connect(dsn, pool_size=8, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
@@ -116,6 +116,6 @@ class SessionBase(Model):
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('commit and destroy session {}'.format(session_key))
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -1,23 +0,0 @@
# standard imports
import datetime
import logging
# external imports
from sqlalchemy import Column, String, DateTime
# local imports
from .base import SessionBase
class Debug(SessionBase):
__tablename__ = 'debug'
date_created = Column(DateTime, default=datetime.datetime.utcnow)
tag = Column(String)
description = Column(String)
def __init__(self, tag, description):
self.tag = tag
self.description = description

View File

@@ -4,7 +4,7 @@ import logging
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey
from chainlib.eth.constant import ZERO_ADDRESS
from cic_registry import zero_address
# local imports
from cic_eth.db.models.base import SessionBase
@@ -35,7 +35,7 @@ class Lock(SessionBase):
@staticmethod
def set(chain_str, flags, address=ZERO_ADDRESS, session=None, tx_hash=None):
def set(chain_str, flags, address=zero_address, session=None, tx_hash=None):
"""Sets flags associated with the given address and chain.
If a flags entry does not exist it is created.
@@ -55,9 +55,11 @@ class Lock(SessionBase):
:returns: New flag state of entry
:rtype: number
"""
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Lock)
q = localsession.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
@@ -69,8 +71,7 @@ class Lock(SessionBase):
lock.address = address
lock.blockchain = chain_str
if tx_hash != None:
session.flush()
q = session.query(Otx)
q = localsession.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
if otx != None:
@@ -79,16 +80,17 @@ class Lock(SessionBase):
lock.flags |= flags
r = lock.flags
session.add(lock)
session.commit()
SessionBase.release_session(session)
localsession.add(lock)
localsession.commit()
if session == None:
localsession.close()
return r
@staticmethod
def reset(chain_str, flags, address=ZERO_ADDRESS, session=None):
def reset(chain_str, flags, address=zero_address, session=None):
"""Resets flags associated with the given address and chain.
If the resulting flags entry value is 0, the entry will be deleted.
@@ -108,9 +110,11 @@ class Lock(SessionBase):
:returns: New flag state of entry
:rtype: number
"""
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Lock)
q = localsession.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
@@ -120,19 +124,20 @@ class Lock(SessionBase):
if lock != None:
lock.flags &= ~flags
if lock.flags == 0:
session.delete(lock)
localsession.delete(lock)
else:
session.add(lock)
localsession.add(lock)
r = lock.flags
session.commit()
localsession.commit()
SessionBase.release_session(session)
if session == None:
localsession.close()
return r
@staticmethod
def check(chain_str, flags, address=ZERO_ADDRESS, session=None):
def check(chain_str, flags, address=zero_address, session=None):
"""Checks whether all given flags are set for given address and chain.
Does not validate the address against any other tables or components.
@@ -151,20 +156,22 @@ class Lock(SessionBase):
:rtype: number
"""
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Lock)
q = localsession.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
q = q.filter(Lock.flags.op('&')(flags)==flags)
lock = q.first()
if session == None:
localsession.close()
r = 0
if lock != None:
r = lock.flags & flags
SessionBase.release_session(session)
return r

View File

@@ -1,16 +1,11 @@
# standard imports
import logging
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime
from sqlalchemy import Column, String, Integer
# local imports
from .base import SessionBase
from cic_eth.error import (
InitializationError,
IntegrityError,
)
logg = logging.getLogger()
@@ -26,9 +21,12 @@ class Nonce(SessionBase):
@staticmethod
def get(address, session=None):
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Nonce)
q = localsession.query(Nonce)
q = q.filter(Nonce.address_hex==address)
nonce = q.first()
@@ -36,7 +34,8 @@ class Nonce(SessionBase):
if nonce != None:
nonce_value = nonce.nonce;
SessionBase.release_session(session)
if session == None:
localsession.close()
return nonce_value
@@ -56,43 +55,7 @@ class Nonce(SessionBase):
@staticmethod
def __inc(conn, address):
#conn.execute("UPDATE nonce set nonce = nonce + 1 WHERE address_hex = '{}'".format(address))
q = conn.query(Nonce)
q = q.filter(Nonce.address_hex==address)
q = q.with_for_update()
o = q.first()
nonce = o.nonce
o.nonce += 1
conn.add(o)
conn.flush()
return nonce
@staticmethod
def __init(conn, address, nonce):
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
@staticmethod
def init(address, nonce=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==address)
o = q.first()
if o != None:
session.flush()
raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce))
session.flush()
Nonce.__init(session, address, nonce)
SessionBase.release_session(session)
# TODO: Incrementing nonce MUST be done by separate tasks.
@staticmethod
def next(address, initial_if_not_exists=0, session=None):
def next(address, initial_if_not_exists=0):
"""Generate next nonce for the given address.
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
@@ -104,106 +67,20 @@ class Nonce(SessionBase):
:returns: Nonce
:rtype: number
"""
session = SessionBase.bind_session(session)
#session.begin_nested()
#conn = Nonce.engine.connect()
#if Nonce.transactional:
# conn.execute('BEGIN')
# conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
# logg.debug('locking nonce table for address {}'.format(address))
#nonce = Nonce.__get(conn, address)
nonce = Nonce.__get(session, address)
conn = Nonce.engine.connect()
if Nonce.transactional:
conn.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
nonce = Nonce.__get(conn, address)
logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None:
nonce = initial_if_not_exists
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
#Nonce.__init(conn, address, nonce)
Nonce.__init(session, address, nonce)
#Nonce.__set(conn, address, nonce+1)
nonce = Nonce.__inc(session, address)
#if Nonce.transactional:
#conn.execute('COMMIT')
# logg.debug('unlocking nonce table for address {}'.format(address))
#conn.close()
#session.commit()
SessionBase.release_session(session)
Nonce.__set(conn, address, nonce+1)
if Nonce.transactional:
conn.execute('COMMIT')
conn.close()
return nonce
class NonceReservation(SessionBase):
__tablename__ = 'nonce_task_reservation'
address_hex = Column(String(42))
nonce = Column(Integer)
key = Column(String)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
@staticmethod
def peek(address, key, session=None):
session = SessionBase.bind_session(session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
q = q.filter(NonceReservation.address_hex==address)
o = q.first()
r = None
if o != None:
r = (o.key, o.nonce)
session.flush()
SessionBase.release_session(session)
return r
@staticmethod
def release(address, key, session=None):
session = SessionBase.bind_session(session)
o = NonceReservation.peek(address, key, session=session)
if o == None:
SessionBase.release_session(session)
raise IntegrityError('"release" called on key {} address {} which does not exists'.format(key, address))
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
q = q.filter(NonceReservation.address_hex==address)
o = q.first()
r = (o.key, o.nonce)
session.delete(o)
session.flush()
SessionBase.release_session(session)
return r
@staticmethod
def next(address, key, session=None):
session = SessionBase.bind_session(session)
o = NonceReservation.peek(address, key, session)
if o != None:
raise IntegrityError('"next" called on nonce for key {} address {} during active key {}'.format(key, address, o[0]))
nonce = Nonce.next(address, session=session)
o = NonceReservation()
o.nonce = nonce
o.key = key
o.address_hex = address
session.add(o)
r = (key, nonce)
SessionBase.release_session(session)
return r

View File

@@ -2,7 +2,7 @@
import datetime
import logging
# external imports
# third-party imports
from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
@@ -15,6 +15,7 @@ from cic_eth.db.enum import (
is_error_status,
)
from cic_eth.db.error import TxStateChangeError
#from cic_eth.eth.util import address_hex_from_signed_tx
logg = logging.getLogger()
@@ -78,13 +79,6 @@ class Otx(SessionBase):
return r
def __status_not_set(self, status):
r = not(self.status & status)
if r:
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None):
"""Set block number transaction was mined in.
@@ -94,16 +88,19 @@ class Otx(SessionBase):
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
if self.block != None:
SessionBase.release_session(session)
raise TxStateChangeError('Attempted set block {} when block was already {}'.format(block, self.block))
self.block = block
session.add(self)
session.flush()
localsession.add(self)
localsession.flush()
SessionBase.release_session(session)
if session==None:
localsession.commit()
localsession.close()
def waitforgas(self, session=None):
@@ -119,10 +116,8 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.GAS_ISSUES, session)
@@ -145,10 +140,8 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
SessionBase.release_session(session)
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
@@ -170,13 +163,10 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if is_error_status(self.status):
SessionBase.release_session(session)
raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session)
@@ -196,13 +186,10 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if self.status & StatusBits.OBSOLETE:
SessionBase.release_session(session)
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.OBSOLETE, session)
@@ -222,7 +209,6 @@ class Otx(SessionBase):
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.MANUAL, session)
@@ -245,10 +231,8 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0:
SessionBase.release_session(session)
raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
@@ -273,10 +257,8 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
SessionBase.release_session(session)
raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
@@ -301,7 +283,6 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.IN_NETWORK, session)
@@ -326,10 +307,8 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session)
@@ -341,34 +320,6 @@ class Otx(SessionBase):
SessionBase.release_session(session)
def dequeue(self, session=None):
"""Marks that a process to execute send attempt is underway
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_not_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('QUEUED cannot be unset on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('QUEUED cannot be unset on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__reset_status(StatusBits.QUEUED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None):
"""Marks that transaction was mined but code execution did not succeed.
@@ -384,10 +335,8 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if block != None:
@@ -415,17 +364,27 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed:
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
SessionBase.release_session(session)
if not self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusEnum.CANCELLED, session)
else:
self.__set_status(StatusEnum.OBSOLETED, session)
# if confirmed:
# if self.status != StatusEnum.OBSOLETED:
# logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.CANCELLED, session)
# elif self.status != StatusEnum.OBSOLETED:
# if self.status > StatusEnum.SENT:
# logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)
@@ -445,13 +404,10 @@ class Otx(SessionBase):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
SessionBase.release_session(session)
raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status)))
if block != None:
@@ -532,23 +488,22 @@ class Otx(SessionBase):
session.add(l)
# TODO: it is not safe to return otx here unless session has been passed in
@staticmethod
def add(nonce, address, tx_hash, signed_tx, session=None):
external_session = session != None
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
otx = Otx(nonce, address, tx_hash, signed_tx)
session.add(otx)
session.flush()
localsession.add(otx)
localsession.flush()
if otx.tracing:
otx.__state_log(session=session)
session.flush()
otx.__state_log(session=localsession)
localsession.flush()
SessionBase.release_session(session)
if not external_session:
if session==None:
localsession.commit()
localsession.close()
return None
return otx

View File

@@ -1,9 +1,9 @@
# standard imports
import logging
# external imports
# third-party imports
from sqlalchemy import Column, String, Text
from chainlib.eth.constant import ZERO_ADDRESS
from cic_registry import zero_address
# local imports
from .base import SessionBase
@@ -24,10 +24,9 @@ class AccountRole(SessionBase):
tag = Column(Text)
address_hex = Column(String(42))
# TODO:
@staticmethod
def get_address(tag, session):
def get_address(tag):
"""Get Ethereum address matching the given tag
:param tag: Tag
@@ -35,26 +34,14 @@ class AccountRole(SessionBase):
:returns: Ethereum address, or zero-address if tag does not exist
:rtype: str, 0x-hex
"""
if session == None:
raise ValueError('nested bind session calls will not succeed as the first call to release_session in the stack will leave the db object detached further down the stack. We will need additional reference count.')
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
r = ZERO_ADDRESS
if role != None:
r = role.address_hex
session.flush()
SessionBase.release_session(session)
return r
role = AccountRole.get_role(tag)
if role == None:
return zero_address
return role.address_hex
@staticmethod
def get_role(tag, session=None):
def get_role(tag):
"""Get AccountRole model object matching the given tag
:param tag: Tag
@@ -62,27 +49,20 @@ class AccountRole(SessionBase):
:returns: Role object, if found
:rtype: cic_eth.db.models.role.AccountRole
"""
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
session.flush()
SessionBase.release_session(session)
session = AccountRole.create_session()
role = AccountRole.__get_role(session, tag)
session.close()
#return role.address_hex
return role
@staticmethod
def __get_role(tag, session):
q = session.query(AccountRole)
q = q.filter(AccountRole.tag==tag)
r = q.first()
return r
def __get_role(session, tag):
return session.query(AccountRole).filter(AccountRole.tag==tag).first()
@staticmethod
def set(tag, address_hex, session=None):
def set(tag, address_hex):
"""Persist a tag to Ethereum address association.
This will silently overwrite the existing value.
@@ -94,18 +74,16 @@ class AccountRole(SessionBase):
:returns: Role object
:rtype: cic_eth.db.models.role.AccountRole
"""
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
#session = AccountRole.create_session()
#role = AccountRole.__get(session, tag)
role = AccountRole.get_role(tag) #session, tag)
if role == None:
role = AccountRole(tag)
role.address_hex = address_hex
session.flush()
SessionBase.release_session(session)
return role
#session.add(role)
#session.commit()
#session.close()
return role #address_hex
@staticmethod
@@ -117,20 +95,23 @@ class AccountRole(SessionBase):
:returns: Role tag, or None if no match
:rtype: str or None
"""
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(AccountRole)
q = localsession.query(AccountRole)
q = q.filter(AccountRole.address_hex==address)
role = q.first()
tag = None
if role != None:
tag = role.tag
SessionBase.release_session(session)
if session == None:
localsession.close()
return tag
def __init__(self, tag):
self.tag = tag
self.address_hex = ZERO_ADDRESS
self.address_hex = zero_address

View File

@@ -85,27 +85,26 @@ class TxCache(SessionBase):
:param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex
"""
session = SessionBase.bind_session(session)
localsession = SessionBase.bind_session(session)
q = session.query(TxCache)
q = localsession.query(TxCache)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_original)
txc = q.first()
if txc == None:
SessionBase.release_session(session)
SessionBase.release_session(localsession)
raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None:
SessionBase.release_session(session)
SessionBase.release_session(localsession)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
session.flush()
q = session.query(Otx)
q = localsession.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_new)
otx = q.first()
if otx == None:
SessionBase.release_session(session)
SessionBase.release_session(localsession)
raise NotLocalTxError('new {}'.format(tx_hash_new))
txc_new = TxCache(
@@ -116,21 +115,18 @@ class TxCache(SessionBase):
txc.destination_token_address,
int(txc.from_value),
int(txc.to_value),
session=session,
)
session.add(txc_new)
session.commit()
localsession.add(txc_new)
localsession.commit()
SessionBase.release_session(session)
SessionBase.release_session(localsession)
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
localsession = SessionBase.bind_session(session)
tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if tx == None:
SessionBase.release_session(session)
SessionBase.release_session(localsession)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id
@@ -143,9 +139,9 @@ class TxCache(SessionBase):
self.block_number = block_number
self.tx_index = tx_index
# not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.utcnow()
self.date_created = datetime.datetime.now()
self.date_updated = self.date_created
self.date_checked = self.date_created
SessionBase.release_session(session)
SessionBase.release_session(localsession)

View File

@@ -54,29 +54,8 @@ class RoleMissingError(Exception):
pass
class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks
"""
pass
class LockedError(Exception):
"""Exception raised when attempt is made to execute action that is deactivated by lock
"""
pass
class SignerError(Exception):
"""Exception raised when signer is unavailable or generates an error
"""
pass
class EthError(Exception):
"""Exception raised when unspecified error from evm node is encountered
"""
pass

View File

@@ -0,0 +1,16 @@
"""Ethereum batch functions and utilities
.. moduleauthor:: Louis Holbrook <dev@holbrook.no>
"""
# standard imports
import os
# local imports
from .rpc import RpcClient
registry_extra_identifiers = {
'Faucet': '0x{:0<64s}'.format(b'Faucet'.hex()),
'TransferApproval': '0x{:0<64s}'.format(b'TransferApproval'.hex()),
}

View File

@@ -1,60 +1,137 @@
# standard imports
import logging
# external imports
# third-party imports
import web3
import celery
from erc20_single_shot_faucet import SingleShotFaucet as Faucet
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import (
strip_0x,
)
from chainlib.connection import RPCConnection
from chainlib.eth.sign import (
new_account,
sign_message,
)
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
TxFormat,
unpack,
)
from chainlib.chain import ChainSpec
from eth_accounts_index import AccountRegistry
from sarafu_faucet import MinterFaucet as Faucet
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from erc20_single_shot_faucet import Faucet
from cic_registry import zero_address
# local import
from cic_eth_registry import CICRegistry
from cic_eth.eth.gas import (
create_check_gas_task,
)
#from cic_eth.eth.factory import TxFactory
from cic_eth.eth import RpcClient
from cic_eth.eth import registry_extra_identifiers
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.factory import TxFactory
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache
from cic_eth.error import (
RoleMissingError,
SignerError,
)
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndSignerTask,
BaseTask,
)
from cic_eth.eth.nonce import (
CustodialTaskNonceOracle,
)
from cic_eth.queue.tx import (
register_tx,
)
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.error import RoleMissingError
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
celery_app = celery.current_app
class AccountTxFactory(TxFactory):
"""Factory for creating account index contract transactions
"""
def add(
self,
address,
chain_spec,
):
"""Register an Ethereum account address with the on-chain account registry
:param address: Ethereum account address to add
:type address: str, 0x-hex
:param chain_spec: Chain to build transaction for
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "AccountRegistry.add" transaction in standard Ethereum format
:rtype: dict
"""
c = CICRegistry.get_contract(chain_spec, 'AccountRegistry')
f = c.function('add')
tx_add_buildable = f(
address,
)
gas = c.gas('add')
tx_add = tx_add_buildable.buildTransaction({
'from': self.address,
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
'value': 0,
})
return tx_add
def gift(
self,
address,
chain_spec,
):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
:param address: Ethereum account address to gift to
:type address: str, 0x-hex
:param chain_spec: Chain to build transaction for
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "Faucet.giveTo" transaction in standard Ethereum format
:rtype: dict
"""
c = CICRegistry.get_contract(chain_spec, 'Faucet')
f = c.function('giveTo')
tx_add_buildable = f(address)
gas = c.gas('add')
tx_add = tx_add_buildable.buildTransaction({
'from': self.address,
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
'value': 0,
})
return tx_add
def unpack_register(data):
"""Verifies that a transaction is an "AccountRegister.add" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != '0a3b0a4f':
raise ValueError('Invalid account index register data ({})'.format(f))
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
def unpack_gift(data):
"""Verifies that a transaction is a "Faucet.giveTo" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != '63e4bff4':
raise ValueError('Invalid account index register data ({})'.format(f))
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
# TODO: Separate out nonce initialization task
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def create(self, password, chain_spec_dict):
@celery_app.task()
def create(password, chain_str):
"""Creates and stores a new ethereum account in the keystore.
The password is passed on to the wallet backend, no encryption is performed in the task worker.
@@ -66,27 +143,27 @@ def create(self, password, chain_spec_dict):
:returns: Ethereum address of newly created account
:rtype: str, 0x-hex
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
a = None
conn = RPCConnection.connect(chain_spec, 'signer')
o = new_account()
a = conn.do(o)
conn.disconnect()
if a == None:
raise SignerError('create account')
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
a = c.w3.eth.personal.new_account(password)
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
session = self.create_session()
Nonce.init(a, session=session)
session.commit()
n = c.w3.eth.getTransactionCount(a, 'pending')
session = SessionBase.create_session()
o = session.query(Nonce).filter(Nonce.address_hex==a).first()
if o == None:
o = Nonce()
o.address_hex = a
o.nonce = n
session.add(o)
session.commit()
session.close()
return a
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyAndSignerTask)
def register(self, account_address, chain_spec_dict, writer_address=None):
@celery_app.task(bind=True, throws=(RoleMissingError,))
def register(self, account_address, chain_str, writer_address=None):
"""Creates a transaction to add the given address to the accounts index.
:param account_address: Ethereum address to add
@@ -99,52 +176,32 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
:returns: The account_address input param
:rtype: str, 0x-hex
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
chain_spec = ChainSpec.from_chain_str(chain_str)
session = self.create_session()
if writer_address == None:
writer_address = AccountRole.get_address('ACCOUNT_REGISTRY_WRITER', session=session)
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER')
if writer_address == zero_address:
raise RoleMissingError(account_address)
if writer_address == ZERO_ADDRESS:
session.close()
raise RoleMissingError('writer address for regsistering {}'.format(account_address))
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
queue = self.request.delivery_info.get('routing_key')
queue = self.request.delivery_info['routing_key']
# Retrieve account index address
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
call_address = AccountRole.get_address('DEFAULT', session=session)
if writer_address == ZERO_ADDRESS:
session.close()
raise RoleMissingError('call address for resgistering {}'.format(account_address))
account_registry_address = registry.by_name('AccountRegistry', sender_address=call_address)
# Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas)
account_registry = AccountRegistry(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c)
# add transaction to queue
cache_task = 'cic_eth.eth.account.cache_account_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
session.commit()
session.close()
tx_add = txf.add(account_address, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data')
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
logg.debug('register user tx {} {} {}'.format(tx_hash_hex, queue, gas_budget))
rpc.disconnect()
gas_budget = tx_add['gas'] * tx_add['gasPrice']
s = create_check_gas_task(
logg.debug('register user tx {}'.format(tx_hash_hex))
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_spec,
chain_str,
writer_address,
gas=gas_budget,
gas_budget,
tx_hashes_hex=[tx_hash_hex],
queue=queue,
)
@@ -152,8 +209,8 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
return account_address
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def gift(self, account_address, chain_spec_dict):
@celery_app.task(bind=True)
def gift(self, account_address, chain_str):
"""Creates a transaction to invoke the faucet contract for the given address.
:param account_address: Ethereum address to give to
@@ -163,51 +220,34 @@ def gift(self, account_address, chain_spec_dict):
:returns: Raw signed transaction
:rtype: list with transaction as only element
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
chain_spec = ChainSpec.from_chain_str(chain_str)
logg.debug('gift account address {} to index'.format(account_address))
queue = self.request.delivery_info.get('routing_key')
queue = self.request.delivery_info['routing_key']
# Retrieve account index address
session = self.create_session()
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
faucet_address = registry.by_name('Faucet', sender_address=self.call_address)
c = RpcClient(chain_spec, holder_address=account_address)
txf = AccountTxFactory(account_address, c)
# Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(account_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, Faucet.gas)
faucet = Faucet(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = faucet.give_to(faucet_address, account_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
tx_add = txf.gift(account_address, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data')
# add transaction to queue
cache_task = 'cic_eth.eth.account.cache_gift_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task, session=session)
session.commit()
session.close()
gas_budget = tx_add['gas'] * tx_add['gasPrice']
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
logg.debug('register user tx {} {} {}'.format(tx_hash_hex, queue, gas_budget))
rpc.disconnect()
s = create_check_gas_task(
logg.debug('register user tx {}'.format(tx_hash_hex))
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_spec,
chain_str,
account_address,
gas_budget,
[tx_hash_hex],
queue=queue,
)
s.apply_async()
return [tx_signed_raw_hex]
@celery_app.task(bind=True)
def have(self, account, chain_spec_dict):
def have(self, account, chain_str):
"""Check whether the given account exists in keystore
:param account: Account to check
@@ -217,38 +257,17 @@ def have(self, account, chain_spec_dict):
:returns: Account, or None if not exists
:rtype: Varies
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
o = sign_message(account, '0x2a')
c = RpcClient(account)
try:
conn = RPCConnection.connect(chain_spec, 'signer')
except Exception as e:
logg.debug('cannot sign with {}: {}'.format(account, e))
return None
try:
conn.do(o)
conn.disconnect()
c.w3.eth.sign(account, text='2a')
return account
except Exception as e:
logg.debug('cannot sign with {}: {}'.format(account, e))
conn.disconnect()
return None
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def set_role(self, tag, address, chain_spec_dict):
if not to_checksum_address(address):
raise ValueError('invalid checksum address {}'.format(address))
session = SessionBase.create_session()
role = AccountRole.set(tag, address, session=session)
session.add(role)
session.commit()
session.close()
return tag
@celery_app.task(bind=True, base=BaseTask)
def role(self, address, chain_spec_dict):
@celery_app.task(bind=True)
def role(self, account, chain_str):
"""Return account role for address
:param account: Account to check
@@ -258,18 +277,14 @@ def role(self, address, chain_spec_dict):
:returns: Account, or None if not exists
:rtype: Varies
"""
session = self.create_session()
role_tag = AccountRole.role_for(address, session=session)
session.close()
return role_tag
return AccountRole.role_for(account)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task()
def cache_gift_data(
self,
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
chain_str,
):
"""Generates and commits transaction cache metadata for a Faucet.giveTo transaction
@@ -282,20 +297,21 @@ def cache_gift_data(
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = Faucet.parse_give_to_request(tx['data'])
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_gift(tx['data'])
session = self.create_session()
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
zero_address,
zero_address,
0,
0,
session=session,
@@ -308,12 +324,11 @@ def cache_gift_data(
return (tx_hash_hex, cache_id)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task()
def cache_account_data(
self,
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
chain_str,
):
"""Generates and commits transaction cache metadata for an AccountsIndex.add transaction
@@ -326,18 +341,21 @@ def cache_account_data(
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_id=chain_spec.chain_id())
tx_data = AccountRegistry.parse_add_request(tx['data'])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_register(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
zero_address,
zero_address,
0,
0,
session=session,

View File

@@ -1,305 +0,0 @@
# standard imports
import logging
# external imports
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import (
TxFormat,
unpack,
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from hexathon import strip_0x
# local imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.error import TokenCountError, PermanentTxError, OutOfGasError, NotLocalTxError
from cic_eth.queue.tx import register_tx
from cic_eth.eth.gas import (
create_check_gas_task,
MaxGasOracle,
)
#from cic_eth.eth.factory import TxFactory
from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalSQLAlchemyAndSignerTask,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(base=CriticalWeb3Task)
def balance(tokens, holder_address, chain_spec_dict):
"""Return token balances for a list of tokens for given address
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param chain_spec_dict: Chain spec string representation
:type chain_spec_dict: str
:return: List of balances
:rtype: list of int
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
caller_address = ERC20Token.caller_address
for t in tokens:
address = t['address']
token = ERC20Token(rpc, address)
c = ERC20()
o = c.balance_of(address, holder_address, sender_address=caller_address)
r = rpc.do(o)
t['balance_network'] = c.parse_balance(r)
rpc.disconnect()
return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_dict):
"""Transfer ERC20 tokens between addresses
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
t = tokens[0]
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
rpc.disconnect()
cache_task = 'cic_eth.eth.erc20.cache_transfer_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
session.commit()
session.close()
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
logg.debug('transfer tx {} {} {}'.format(tx_hash_hex, queue, gas_budget))
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def approve(self, tokens, holder_address, spender_address, value, chain_spec_dict):
"""Approve ERC20 transfer on behalf of holder address
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
t = tokens[0]
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
rpc.disconnect()
cache_task = 'cic_eth.eth.erc20.cache_approve_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
session.commit()
session.close()
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalWeb3Task)
def resolve_tokens_by_symbol(self, token_symbols, chain_spec_dict):
"""Returns contract addresses of an array of ERC20 token symbols
:param token_symbols: Token symbols to resolve
:type token_symbols: list of str
:param chain_str: Chain spec string representation
:type chain_str: str
:return: Respective token contract addresses
:rtype: list of str, 0x-hex
"""
tokens = []
chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
session = self.create_session()
sender_address = AccountRole.get_address('DEFAULT', session)
session.close()
for token_symbol in token_symbols:
token_address = registry.by_name(token_symbol, sender_address=sender_address)
logg.debug('token {}'.format(token_address))
tokens.append({
'address': token_address,
'converters': [],
})
rpc.disconnect()
return tokens
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_transfer_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_transfer
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = ERC20.parse_transfer_request(tx['data'])
recipient_address = tx_data[0]
token_value = tx_data[1]
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_approve_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_approve
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = ERC20.parse_approve_request(tx['data'])
recipient_address = tx_data[0]
token_value = tx_data[1]
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -0,0 +1,41 @@
# standard imports
import logging
# local imports
from cic_registry import CICRegistry
from cic_eth.eth.nonce import NonceOracle
from cic_eth.eth import RpcClient
logg = logging.getLogger(__name__)
class TxFactory:
"""Base class for transaction factory classes.
:param from_address: Signer address to create transaction on behalf of
:type from_address: str, 0x-hex
:param rpc_client: RPC connection object to use to acquire account nonce if no record in nonce cache
:type rpc_client: cic_eth.eth.rpc.RpcClient
"""
gas_price = 100
"""Gas price, updated between batches"""
def __init__(self, from_address, rpc_client):
self.address = from_address
self.default_nonce = rpc_client.w3.eth.getTransactionCount(from_address, 'pending')
self.nonce_oracle = NonceOracle(from_address, self.default_nonce)
TxFactory.gas_price = rpc_client.gas_price()
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self):
"""Returns the current cached nonce value, and increments it for next transaction.
:returns: Nonce
:rtype: number
"""
return self.nonce_oracle.next()

View File

@@ -1,138 +1,71 @@
# standard imports
import logging
# external imports
import celery
from chainlib.eth.gas import price
from hexathon import strip_0x
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
logg = logging.getLogger()
#
#class GasOracle():
# """Provides gas pricing for transactions.
#
# :param w3: Web3 object
# :type w3: web3.Web3
# """
#
# __safe_threshold_amount_value = 2000000000 * 60000 * 3
# __refill_amount_value = __safe_threshold_amount_value * 5
# default_gas_limit = 21000
#
# def __init__(self, conn):
# o = price()
# r = conn.do(o)
# b = bytes.from_hex(strip_0x(r))
# self.gas_price_current = int.from_bytes(b, 'big')
#
# #self.w3 = w3
# #self.gas_price_current = w3.eth.gas_price()
#
#
# def safe_threshold_amount(self):
# """The gas balance threshold under which a new gas refill transaction should be initiated.
#
# :returns: Gas token amount
# :rtype: number
# """
# g = GasOracle.__safe_threshold_amount_value
# logg.warning('gas safe threshold is currently hardcoded to {}'.format(g))
# return g
#
#
# def refill_amount(self):
# """The amount of gas tokens to send in a gas refill transaction.
#
# :returns: Gas token amount
# :rtype: number
# """
# g = GasOracle.__refill_amount_value
# logg.warning('gas refill amount is currently hardcoded to {}'.format(g))
# return g
#
#
# def gas_provider(self):
# """Gas provider address.
#
# :returns: Etheerum account address
# :rtype: str, 0x-hex
# """
# session = SessionBase.create_session()
# a = AccountRole.get_address('GAS_GIFTER', session)
# logg.debug('gasgifter {}'.format(a))
# session.close()
# return a
#
#
# def gas_price(self, category='safe'):
# """Get projected gas price to use for a transaction at the current moment.
#
# When the category parameter is implemented, it can be used to control the priority of a transaction in the network.
#
# :param category: Bid level category to return price for. Currently has no effect.
# :type category: str
# :returns: Gas price
# :rtype: number
# """
# #logg.warning('gas price hardcoded to category "safe"')
# #g = 100
# #return g
# return self.gas_price_current
class GasOracle():
"""Provides gas pricing for transactions.
class MaxGasOracle:
def gas(code=None):
return 8000000
def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
"""Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher.
If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value.
:param tx_signed_raws_hex: Raw signed transaction data
:type tx_signed_raws_hex: list of str, 0x-hex
:param chain_spec: Chain spec of address to add check gas for
:type chain_spec: chainlib.chain.ChainSpec
:param holder_address: Address sending the transactions
:type holder_address: str, 0x-hex
:param gas: Gas budget hint for transactions
:type gas: int
:param tx_hashes_hex: Transaction hashes
:type tx_hashes_hex: list of str, 0x-hex
:param queue: Task queue
:type queue: str
:returns: Signature of task chain
:rtype: celery.Signature
:param w3: Web3 object
:type w3: web3.Web3
"""
s_check_gas = None
if tx_hashes_hex != None:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
tx_hashes_hex,
chain_spec.asdict(),
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
else:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
chain_spec.asdict(),
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
return s_check_gas
__safe_threshold_amount_value = 2000000000 * 60000 * 3
__refill_amount_value = __safe_threshold_amount_value * 5
default_gas_limit = 21000
def __init__(self, w3):
self.w3 = w3
self.gas_price_current = w3.eth.gas_price()
def safe_threshold_amount(self):
"""The gas balance threshold under which a new gas refill transaction should be initiated.
:returns: Gas token amount
:rtype: number
"""
g = GasOracle.__safe_threshold_amount_value
logg.warning('gas safe threshold is currently hardcoded to {}'.format(g))
return g
def refill_amount(self):
"""The amount of gas tokens to send in a gas refill transaction.
:returns: Gas token amount
:rtype: number
"""
g = GasOracle.__refill_amount_value
logg.warning('gas refill amount is currently hardcoded to {}'.format(g))
return g
def gas_provider(self):
"""Gas provider address.
:returns: Etheerum account address
:rtype: str, 0x-hex
"""
return AccountRole.get_address('GAS_GIFTER')
def gas_price(self, category='safe'):
"""Get projected gas price to use for a transaction at the current moment.
When the category parameter is implemented, it can be used to control the priority of a transaction in the network.
:param category: Bid level category to return price for. Currently has no effect.
:type category: str
:returns: Gas price
:rtype: number
"""
#logg.warning('gas price hardcoded to category "safe"')
#g = 100
#return g
return self.gas_price_current

View File

@@ -1,71 +0,0 @@
# extended imports
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status as TxStatus
class ExtendedTx:
_default_decimals = 6
def __init__(self, rpc, tx_hash, chain_spec):
self.rpc = rpc
self.chain_spec = chain_spec
self.hash = tx_hash
self.sender = None
self.sender_label = None
self.recipient = None
self.recipient_label = None
self.source_token_value = 0
self.destination_token_value = 0
self.source_token = ZERO_ADDRESS
self.destination_token = ZERO_ADDRESS
self.source_token_symbol = ''
self.destination_token_symbol = ''
self.source_token_decimals = ExtendedTx._default_decimals
self.destination_token_decimals = ExtendedTx._default_decimals
self.status = TxStatus.PENDING.name
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
self.sender = sender
self.recipient = recipient
if trusted_declarator_addresses != None:
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain_spec)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain_spec)
def set_tokens(self, source, source_value, destination=None, destination_value=None):
if destination == None:
destination = source
if destination_value == None:
destination_value = source_value
st = ERC20Token(self.rpc, source)
dt = ERC20Token(self.rpc, destination)
self.source_token = source
self.source_token_symbol = st.symbol
self.source_token_name = st.name
self.source_token_decimals = st.decimals
self.source_token_value = source_value
self.destination_token = destination
self.destination_token_symbol = dt.symbol
self.destination_token_name = dt.name
self.destination_token_decimals = dt.decimals
self.destination_token_value = destination_value
def set_status(self, n):
if n:
self.status = TxStatus.ERROR.name
else:
self.status = TxStatus.SUCCESS.name
self.status_code = n
def to_dict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
continue
o[attr] = getattr(self, attr)
return o

View File

@@ -1,10 +1,7 @@
# local imports
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
from cic_eth.db.models.nonce import Nonce
class CustodialTaskNonceOracle():
class NonceOracle():
"""Ensures atomic nonce increments for all transactions across all tasks and threads.
:param address: Address to generate nonces for
@@ -12,21 +9,15 @@ class CustodialTaskNonceOracle():
:param default_nonce: Initial nonce value to use if no nonce cache entry already exists
:type default_nonce: number
"""
def __init__(self, address, uuid, session=None):
def __init__(self, address, default_nonce):
self.address = address
self.uuid = uuid
self.session = session
self.default_nonce = default_nonce
def get_nonce(self):
return self.next_nonce()
def next_nonce(self):
def next(self):
"""Get next unique nonce.
:returns: Nonce
:rtype: number
"""
r = NonceReservation.release(self.address, self.uuid, session=self.session)
return r[1]
return Nonce.next(self.address, self.default_nonce)

View File

@@ -0,0 +1,194 @@
# standard imports
import logging
# third-party imports
import web3
import celery
from erc20_approval_escrow import TransferApproval
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.eth import RpcClient
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.error import TokenCountError
celery_app = celery.current_app
logg = logging.getLogger()
contract_function_signatures = {
'request': 'b0addede',
}
class TransferRequestTxFactory(TxFactory):
"""Factory for creating Transfer request transactions using the TransferApproval contract backend
"""
def request(
self,
token_address,
beneficiary_address,
amount,
chain_spec,
):
"""Create a new TransferApproval.request transaction
:param token_address: Token to create transfer request for
:type token_address: str, 0x-hex
:param beneficiary_address: Beneficiary of token transfer
:type beneficiary_address: str, 0x-hex
:param amount: Amount of tokens to transfer
:type amount: number
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Transaction in standard Ethereum format
:rtype: dict
"""
transfer_approval = CICRegistry.get_contract(chain_spec, 'TransferApproval', 'TransferAuthorization')
fn = transfer_approval.function('createRequest')
tx_approval_buildable = fn(beneficiary_address, token_address, amount)
transfer_approval_gas = transfer_approval.gas('createRequest')
tx_approval = tx_approval_buildable.buildTransaction({
'from': self.address,
'gas': transfer_approval_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_approval
def unpack_transfer_approval_request(data):
"""Verifies that a transaction is an "TransferApproval.request" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != contract_function_signatures['request']:
raise ValueError('Invalid transfer request data ({})'.format(f))
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'token': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
'amount': int(d[128:], 16)
}
@celery_app.task(bind=True)
def transfer_approval_request(self, tokens, holder_address, receiver_address, value, chain_str):
"""Creates a new transfer approval
:param tokens: Token to generate transfer request for
:type tokens: list with single token spec as dict
:param holder_address: Address to generate transfer on behalf of
:type holder_address: str, 0x-hex
:param receiver_address: Address to transfser tokens to
:type receiver_address: str, 0x-hex
:param value: Amount of tokens to transfer
:type value: number
:param chain_spec: Chain spec string representation
:type chain_spec: str
:raises cic_eth.error.TokenCountError: More than one token in tokens argument
:returns: Raw signed transaction
:rtype: list with transaction as only element
"""
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
t = tokens[0]
c = RpcClient(holder_address)
txf = TransferRequestTxFactory(holder_address, c)
tx_transfer = txf.request(t['address'], receiver_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, 'cic_eth.eth.request.otx_cache_transfer_approval_request')
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return [tx_signed_raw_hex]
@celery_app.task()
def otx_cache_transfer_approval_request(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an TransferApproval.request transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('in otx acche transfer approval request')
(txc, cache_id) = cache_transfer_approval_request_data(tx_hash_hex, tx)
return txc
@celery_app.task()
def cache_transfer_approval_request_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_transfer_approval_request
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_transfer_approval_request(tx['data'])
logg.debug('tx approval request data {}'.format(tx_data))
logg.debug('tx approval request {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx_data['token'],
tx_data['token'],
tx_data['amount'],
tx_data['amount'],
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -0,0 +1,39 @@
# standard imports
import logging
# local imports
from cic_eth.eth.gas import GasOracle
logg = logging.getLogger()
class RpcClient(GasOracle):
"""RPC wrapper for web3 enabling gas calculation helpers and signer middleware.
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:param holder_address: DEPRECATED Address of subject of the session.
:type holder_address: str, 0x-hex
"""
signer_ipc_path = None
"""Unix socket path to JSONRPC signer and keystore"""
web3_constructor = None
"""Custom function to build a web3 object with middleware plugins"""
def __init__(self, chain_spec, holder_address=None):
(self.provider, w3) = RpcClient.web3_constructor()
super(RpcClient, self).__init__(w3)
self.chain_spec = chain_spec
if holder_address != None:
self.holder_address = holder_address
logg.info('gasprice {}'.format(self.gas_price()))
@staticmethod
def set_constructor(web3_constructor):
"""Sets the constructor to use for building the web3 object.
"""
RpcClient.web3_constructor = web3_constructor

View File

@@ -0,0 +1,136 @@
# standard imports
import logging
# third-party imports
import celery
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.eth import RpcClient
from cic_eth.queue.tx import create as queue_create
celery_app = celery.current_app
logg = celery_app.log.get_default_logger()
@celery_app.task()
def sign_tx(tx, chain_str):
"""Sign a single transaction against the given chain specification.
:param tx: Transaction in standard Ethereum format
:type tx: dict
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and raw signed transaction, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
tx_hash_hex = tx_hash.hex()
return (tx_hash_hex, tx_transfer_signed['raw'],)
def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
:param tx: Standard ethereum transaction data
:type tx: dict
:param chain_str: Chain spec, string representation
:type chain_str: str
:param queue: Task queue
:type queue: str
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
:type cache_task: str
:returns: Tuple; Transaction hash, signed raw transaction data
:rtype: tuple
"""
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
logg.debug('adding queue tx {}'.format(tx_hash_hex))
# s = celery.signature(
# 'cic_eth.queue.tx.create',
# [
# tx['nonce'],
# tx['from'],
# tx_hash_hex,
# tx_signed_raw_hex,
# chain_str,
# ],
# queue=queue,
# )
# TODO: consider returning this as a signature that consequtive tasks can be linked to
queue_create(
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
)
if cache_task != None:
logg.debug('adding cache task {} tx {}'.format(cache_task, tx_hash_hex))
s_cache = celery.signature(
cache_task,
[
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
],
queue=queue,
)
s_cache.apply_async()
return (tx_hash_hex, tx_signed_raw_hex,)
# TODO: rename as we will not be sending task in the chain, this is the responsibility of the dispatcher
def create_check_gas_and_send_task(tx_signed_raws_hex, chain_str, holder_address, gas, tx_hashes_hex=None, queue=None):
"""Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher.
If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value.
:param tx_signed_raws_hex: Raw signed transaction data
:type tx_signed_raws_hex: list of str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:param holder_address: Address sending the transactions
:type holder_address: str, 0x-hex
:param gas: Gas budget hint for transactions
:type gas: int
:param tx_hashes_hex: Transaction hashes
:type tx_hashes_hex: list of str, 0x-hex
:param queue: Task queue
:type queue: str
:returns: Signature of task chain
:rtype: celery.Signature
"""
s_check_gas = None
if tx_hashes_hex != None:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
tx_hashes_hex,
chain_str,
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
else:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
chain_str,
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
return s_check_gas

View File

@@ -0,0 +1,506 @@
# standard imports
import logging
# third-party imports
import celery
import requests
import web3
from cic_registry import CICRegistry
from cic_registry import zero_address
from cic_registry.chain import ChainSpec
# platform imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.eth import RpcClient
from cic_eth.error import TokenCountError, PermanentTxError, OutOfGasError, NotLocalTxError
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.ext.address import translate_address
celery_app = celery.current_app
logg = logging.getLogger()
# TODO: fetch from cic-contracts instead when implemented
contract_function_signatures = {
'transfer': 'a9059cbb',
'approve': '095ea7b3',
'transferfrom': '23b872dd',
}
class TokenTxFactory(TxFactory):
"""Factory for creating ERC20 token transactions.
"""
def approve(
self,
token_address,
spender_address,
amount,
chain_spec,
):
"""Create an ERC20 "approve" transaction
:param token_address: ERC20 contract address
:type token_address: str, 0x-hex
:param spender_address: Address to approve spending for
:type spender_address: str, 0x-hex
:param amount: Amount of tokens to approve
:type amount: int
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "approve" transaction in standard Ethereum format
:rtype: dict
"""
source_token = CICRegistry.get_address(chain_spec, token_address)
source_token_contract = source_token.contract
tx_approve_buildable = source_token_contract.functions.approve(
spender_address,
amount,
)
source_token_gas = source_token.gas('transfer')
tx_approve = tx_approve_buildable.buildTransaction({
'from': self.address,
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_approve
def transfer(
self,
token_address,
receiver_address,
value,
chain_spec,
):
"""Create an ERC20 "transfer" transaction
:param token_address: ERC20 contract address
:type token_address: str, 0x-hex
:param receiver_address: Address to send tokens to
:type receiver_address: str, 0x-hex
:param amount: Amount of tokens to send
:type amount: int
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "transfer" transaction in standard Ethereum format
:rtype: dict
"""
source_token = CICRegistry.get_address(chain_spec, token_address)
source_token_contract = source_token.contract
transfer_buildable = source_token_contract.functions.transfer(
receiver_address,
value,
)
source_token_gas = source_token.gas('transfer')
tx_transfer = transfer_buildable.buildTransaction(
{
'from': self.address,
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_transfer
def unpack_transfer(data):
"""Verifies that a transaction is an "ERC20.transfer" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != contract_function_signatures['transfer']:
raise ValueError('Invalid transfer data ({})'.format(f))
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
}
def unpack_transferfrom(data):
"""Verifies that a transaction is an "ERC20.transferFrom" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != contract_function_signatures['transferfrom']:
raise ValueError('Invalid transferFrom data ({})'.format(f))
d = data[10:]
return {
'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
'amount': int(d[128:], 16)
}
def unpack_approve(data):
"""Verifies that a transaction is an "ERC20.approve" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != contract_function_signatures['approve']:
raise ValueError('Invalid approval data ({})'.format(f))
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
}
@celery_app.task()
def balance(tokens, holder_address, chain_str):
"""Return token balances for a list of tokens for given address
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:return: List of balances
:rtype: list of int
"""
#abi = ContractRegistry.abi('ERC20Token')
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
for t in tokens:
#token = CICRegistry.get_address(t['address'])
#abi = token.abi()
#o = c.w3.eth.contract(abi=abi, address=t['address'])
o = CICRegistry.get_address(chain_spec, t['address']).contract
b = o.functions.balanceOf(holder_address).call()
t['balance_network'] = b
return tokens
@celery_app.task(bind=True)
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
"""Transfer ERC20 tokens between addresses
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
# retrieve the token interface
t = tokens[0]
c = RpcClient(chain_spec, holder_address=holder_address)
txf = TokenTxFactory(holder_address, c)
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer')
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True)
def approve(self, tokens, holder_address, spender_address, value, chain_str):
"""Approve ERC20 transfer on behalf of holder address
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
# retrieve the token interface
t = tokens[0]
c = RpcClient(chain_spec, holder_address=holder_address)
txf = TokenTxFactory(holder_address, c)
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve')
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task()
def resolve_tokens_by_symbol(token_symbols, chain_str):
"""Returns contract addresses of an array of ERC20 token symbols
:param token_symbols: Token symbols to resolve
:type token_symbols: list of str
:param chain_str: Chain spec string representation
:type chain_str: str
:return: Respective token contract addresses
:rtype: list of str, 0x-hex
"""
tokens = []
chain_spec = ChainSpec.from_chain_str(chain_str)
for token_symbol in token_symbols:
token = CICRegistry.get_token(chain_spec, token_symbol)
tokens.append({
'address': token.address(),
'converters': [],
})
return tokens
@celery_app.task()
def otx_cache_transfer(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an ERC20.transfer or ERC20.transferFrom transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_transfer_data(tx_hash_hex, tx)
return txc
@celery_app.task()
def cache_transfer_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_transfer
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_transfer(tx['data'])
logg.debug('tx data {}'.format(tx_data))
logg.debug('tx {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx['to'],
tx['to'],
tx_data['amount'],
tx_data['amount'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task()
def otx_cache_approve(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an ERC20.approve transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_approve_data(tx_hash_hex, tx)
return txc
@celery_app.task()
def cache_approve_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_approve
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_approve(tx['data'])
logg.debug('tx data {}'.format(tx_data))
logg.debug('tx {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx['to'],
tx['to'],
tx_data['amount'],
tx_data['amount'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
class ExtendedTx:
_default_decimals = 6
def __init__(self, tx_hash, chain_spec):
self._chain_spec = chain_spec
self.chain = str(chain_spec)
self.hash = tx_hash
self.sender = None
self.sender_label = None
self.recipient = None
self.recipient_label = None
self.source_token_value = 0
self.destination_token_value = 0
self.source_token = zero_address
self.destination_token = zero_address
self.source_token_symbol = ''
self.destination_token_symbol = ''
self.source_token_decimals = ExtendedTx._default_decimals
self.destination_token_decimals = ExtendedTx._default_decimals
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
self.sender = sender
self.recipient = recipient
if trusted_declarator_addresses != None:
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain)
def set_tokens(self, source, source_value, destination=None, destination_value=None):
if destination == None:
destination = source
if destination_value == None:
destination_value = source_value
st = CICRegistry.get_address(self._chain_spec, source)
dt = CICRegistry.get_address(self._chain_spec, destination)
self.source_token = source
self.source_token_symbol = st.symbol()
self.source_token_decimals = st.decimals()
self.source_token_value = source_value
self.destination_token = destination
self.destination_token_symbol = dt.symbol()
self.destination_token_decimals = dt.decimals()
self.destination_token_value = destination_value
def to_dict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'to_dict']:
continue
o[attr] = getattr(self, attr)
return o

View File

@@ -4,42 +4,15 @@ import logging
# third-party imports
import celery
import requests
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
from chainlib.eth.gas import balance
from chainlib.eth.error import (
EthException,
NotFoundEthException,
)
from chainlib.eth.tx import (
transaction,
receipt,
raw,
TxFormat,
unpack,
)
from chainlib.connection import RPCConnection
from chainlib.hash import keccak256_hex_to_hex
from chainlib.eth.gas import Gas
from chainlib.eth.contract import (
abi_decode_single,
ABIContractType,
)
from hexathon import (
add_0x,
strip_0x,
)
import web3
from cic_registry import zero_address
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db import (
Otx,
SessionBase,
)
from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation
from cic_eth.db.models.lock import Lock
from cic_eth.db.models.role import AccountRole
from cic_eth.db.enum import (
LockEnum,
StatusBits,
@@ -47,30 +20,18 @@ from cic_eth.db.enum import (
from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError
from cic_eth.error import NotLocalTxError
#from cic_eth.queue.tx import create as queue_create
from cic_eth.queue.tx import (
get_tx,
register_tx,
get_nonce_tx,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.queue.tx import get_tx
from cic_eth.queue.tx import get_nonce_tx
from cic_eth.error import OutOfGasError
from cic_eth.error import LockedError
from cic_eth.eth.gas import (
create_check_gas_task,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.error import (
AlreadyFillingGasError,
EthError,
)
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import sign_and_register_tx, create_check_gas_and_send_task
from cic_eth.eth.task import sign_tx
from cic_eth.eth.nonce import NonceOracle
from cic_eth.error import AlreadyFillingGasError
from cic_eth.eth.util import tx_hex_string
from cic_eth.admin.ctrl import lock_send
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalWeb3AndSignerTask,
CriticalSQLAlchemyAndSignerTask,
CriticalSQLAlchemyAndWeb3Task,
)
celery_app = celery.current_app
logg = logging.getLogger()
@@ -79,8 +40,8 @@ MAX_NONCE_ATTEMPTS = 3
# TODO this function is too long
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=None):
@celery_app.task(bind=True, throws=(OutOfGasError))
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
@@ -89,8 +50,8 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
:param tx_hashes: Transaction hashes due to be submitted
:type tx_hashes: list of str, 0x-hex
:param chain_spec_dict: Chain spec dict representation
:type chain_spec_dict: dict
:param chain_str: Chain spec string representation
:type chain_str: str
:param txs: Signed raw transaction data, corresponding to tx_hashes
:type txs: list of str, 0x-hex
:param address: Sender address
@@ -104,53 +65,31 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
logg.debug('ooooo {}'.format(o))
if address == None:
address = o['address']
#if not web3.Web3.isChecksumAddress(address):
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_chain_str(chain_str)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info['routing_key']
queue = self.request.delivery_info.get('routing_key')
conn = RPCConnection.connect(chain_spec)
#c = RpcClient(chain_spec, holder_address=address)
c = RpcClient(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
gas_balance = 0
try:
o = balance(address)
r = conn.do(o)
conn.disconnect()
gas_balance = abi_decode_single(ABIContractType.UINT256, r)
except EthException as e:
conn.disconnect()
raise EthError('gas_balance call for {}: {}'.format(address, e))
balance = c.w3.eth.getBalance(address)
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
logg.debug('address {} has gas {} needs {}'.format(address, gas_balance, gas_required))
session = SessionBase.create_session()
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
if gas_required > gas_balance:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
gas_provider,
],
queue=queue,
)
if gas_required > balance:
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_spec_dict,
address,
chain_str,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
s_refill_gas.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
@@ -162,28 +101,20 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
)
wait_tasks.append(s)
celery.group(wait_tasks)()
raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, gas_balance))
raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, balance))
safe_gas = self.safe_gas_threshold_amount
if gas_balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
gas_provider,
],
queue=queue,
)
safe_gas = c.safe_threshold_amount()
if balance < safe_gas:
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_spec_dict,
address,
chain_str,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(gas_provider, address))
s_refill_gas.apply_async()
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
ready_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
@@ -200,7 +131,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
# TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead.
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True)
def hashes_to_txs(self, tx_hashes):
"""Return a list of raw signed transactions from the local transaction queue corresponding to a list of transaction hashes.
@@ -215,6 +146,8 @@ def hashes_to_txs(self, tx_hashes):
queue = self.request.delivery_info['routing_key']
#otxs = ','.format("'{}'".format(tx_hash) for tx_hash in tx_hashes)
session = SessionBase.create_session()
q = session.query(Otx.signed_tx)
q = q.filter(Otx.tx_hash.in_(tx_hashes))
@@ -231,9 +164,157 @@ def hashes_to_txs(self, tx_hashes):
return txs
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
@celery_app.task(bind=True, base=CriticalWeb3Task)
def send(self, txs, chain_spec_dict):
# TODO: Move this and send to subfolder submodule
class ParityNodeHandler:
def __init__(self, chain_spec, queue):
self.chain_spec = chain_spec
self.chain_str = str(chain_spec)
self.queue = queue
def handle(self, exception, tx_hash_hex, tx_hex):
meth = self.handle_default
if isinstance(exception, (ValueError)):
# s_debug = celery.signature(
# 'cic_eth.admin.debug.out_tmp',
# [tx_hash_hex, '{}: {}'.format(tx_hash_hex, exception)],
# queue=queue,
# )
# s_debug.apply_async()
earg = exception.args[0]
if earg['code'] == -32010:
logg.debug('skipping lock for code {}'.format(earg['code']))
meth = self.handle_invalid_parameters
elif earg['code'] == -32602:
meth = self.handle_invalid_encoding
else:
meth = self.handle_invalid
elif isinstance(exception, (requests.exceptions.ConnectionError)):
meth = self.handle_connection
(t, e_fn, message) = meth(tx_hash_hex, tx_hex)
return (t, e_fn, '{} {}'.format(message, exception))
def handle_connection(self, tx_hash_hex, tx_hex):
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
[
tx_hash_hex,
True,
],
queue=self.queue,
)
t = s_set_sent.apply_async()
return (t, TemporaryTxError, 'Sendfail {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_encoding(self, tx_hash_hex, tx_hex):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
'cic_eth.admin.ctrl.lock_send',
[
tx_hash_hex,
self.chain_str,
tx['from'],
tx_hash_hex,
],
queue=self.queue,
)
s_set_reject = celery.signature(
'cic_eth.queue.tx.set_rejected',
[],
queue=self.queue,
)
nonce_txs = get_nonce_tx(tx['nonce'], tx['from'], self.chain_spec.chain_id())
attempts = len(nonce_txs)
if attempts < MAX_NONCE_ATTEMPTS:
logg.debug('nonce {} address {} retries {} < {}'.format(tx['nonce'], tx['from'], attempts, MAX_NONCE_ATTEMPTS))
s_resend = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
self.chain_str,
None,
1.01,
],
queue=self.queue,
)
s_unlock = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
self.chain_str,
tx['from'],
],
queue=self.queue,
)
s_resend.link(s_unlock)
s_set_reject.link(s_resend)
s_lock.link(s_set_reject)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Reject invalid encoding {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_parameters(self, tx_hash_hex, tx_hex):
s_sync = celery.signature(
'cic_eth.eth.tx.sync_tx',
[
tx_hash_hex,
self.chain_str,
],
queue=self.queue,
)
t = s_sync.apply_async()
return (t, PermanentTxError, 'Reject invalid parameters {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid(self, tx_hash_hex, tx_hex):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
'cic_eth.admin.ctrl.lock_send',
[
tx_hash_hex,
self.chain_str,
tx['from'],
tx_hash_hex,
],
queue=self.queue,
)
s_set_reject = celery.signature(
'cic_eth.queue.tx.set_rejected',
[],
queue=self.queue,
)
s_lock.link(s_set_reject)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_default(self, tx_hash_hex, tx_hex):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
'cic_eth.admin.ctrl.lock_send',
[
tx_hash_hex,
self.chain_str,
tx['from'],
tx_hash_hex,
],
queue=self.queue,
)
s_set_fubar = celery.signature(
'cic_eth.queue.tx.set_fubar',
[],
queue=self.queue,
)
s_lock.link(s_set_fubar)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
@celery_app.task(bind=True)
def send(self, txs, chain_str):
"""Send transactions to the network.
If more than one transaction is passed to the task, it will spawn a new send task with the remaining transaction(s) after the first in the list has been processed.
@@ -258,17 +339,25 @@ def send(self, txs, chain_spec_dict):
if len(txs) == 0:
raise ValueError('no transaction to send')
chain_spec = ChainSpec.from_dict(chain_spec_dict)
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_hex = txs[0]
logg.debug('send transaction {}'.format(tx_hex))
tx_hash_hex = add_0x(keccak256_hex_to_hex(tx_hex))
tx_hash = web3.Web3.keccak(hexstr=tx_hex)
tx_hash_hex = tx_hash.hex()
logg.debug('send transaction {} -> {}'.format(tx_hash_hex, tx_hex))
queue = self.request.delivery_info.get('routing_key')
queue = self.request.delivery_info.get('routing_key', None)
c = RpcClient(chain_spec)
r = None
try:
r = c.w3.eth.send_raw_transaction(tx_hex)
except Exception as e:
raiser = ParityNodeHandler(chain_spec, queue)
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
raise e(m)
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
[
@@ -277,11 +366,6 @@ def send(self, txs, chain_spec_dict):
],
queue=queue,
)
o = raw(tx_hex)
conn = RPCConnection.connect(chain_spec, 'default')
conn.do(o)
s_set_sent.apply_async()
tx_tail = txs[1:]
@@ -293,13 +377,11 @@ def send(self, txs, chain_spec_dict):
)
s.apply_async()
return tx_hash_hex
return r.hex()
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3AndSignerTask)
def refill_gas(self, recipient_address, chain_spec_dict):
@celery_app.task(bind=True, throws=(AlreadyFillingGasError))
def refill_gas(self, recipient_address, chain_str):
"""Executes a native token transaction to fund the recipient's gas expenditures.
:param recipient_address: Recipient in need of gas
@@ -310,53 +392,66 @@ def refill_gas(self, recipient_address, chain_spec_dict):
:returns: Transaction hash.
:rtype: str, 0x-hex
"""
# essentials
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
chain_spec = ChainSpec.from_chain_str(chain_str)
# Determine value of gas tokens to send
# if an uncompleted gas refill for the same recipient already exists, we still need to spend the nonce
# however, we will perform a 0-value transaction instead
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.from_value!='0x00')
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
session.close()
if c > 0:
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
raise AlreadyFillingGasError(recipient_address)
# finally determine the value to send
refill_amount = 0
if not zero_amount:
refill_amount = self.safe_gas_refill_amount
queue = self.request.delivery_info['routing_key']
# determine sender
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.flush()
c = RpcClient(chain_spec)
clogg = celery_app.log.get_default_logger()
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
nonce = nonce_generator.next()
gas_price = c.gas_price()
gas_limit = c.default_gas_limit
refill_amount = c.refill_amount()
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
# set up evm RPC connection
rpc = RPCConnection.connect(chain_spec, 'default')
# create and sign transaction
tx_send_gas = {
'from': c.gas_provider(),
'to': recipient_address,
'gas': gas_limit,
'gasPrice': gas_price,
'chainId': chain_spec.chain_id(),
'nonce': nonce,
'value': refill_amount,
'data': '',
}
tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas)
tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw'])
tx_hash_hex = tx_hash.hex()
# set up transaction builder
nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc)
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
c = Gas(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
# build and add transaction
logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address))
(tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED)
# TODO: route this through sign_and_register_tx instead
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
cache_task = 'cic_eth.eth.tx.cache_gas_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
queue_create(
nonce,
c.gas_provider(),
tx_hash_hex,
tx_send_gas_signed['raw'],
chain_str,
)
# add transaction to send queue
s_tx_cache = celery.signature(
'cic_eth.eth.tx.cache_gas_refill_data',
[
tx_hash_hex,
tx_send_gas,
],
queue=queue,
)
s_status = celery.signature(
'cic_eth.queue.tx.set_ready',
[
@@ -364,12 +459,11 @@ def refill_gas(self, recipient_address, chain_spec_dict):
],
queue=queue,
)
t = s_status.apply_async()
return tx_signed_raw_hex
celery.group(s_tx_cache, s_status)()
return tx_send_gas_signed['raw']
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
@celery_app.task(bind=True)
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price.
@@ -387,18 +481,19 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
"""
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==txold_hash_hex)
otx = q.first()
session.close()
if otx == None:
session.close()
raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info['routing_key']
@@ -426,10 +521,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
session=session,
)
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close()
TxCache.clone(txold_hash_hex, tx_hash_hex)
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
@@ -444,63 +537,19 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, signer_address=None):
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,))
def sync_tx(self, tx_hash_hex, chain_str):
self.log_banner()
queue = self.request.delivery_info['routing_key']
session = SessionBase.create_session()
address = None
if signer_address == None:
address = chained_input
logg.debug('non-explicit address for reserve nonce, using arg head {}'.format(chained_input))
else:
#if web3.Web3.isChecksumAddress(signer_address):
if is_checksum_address(signer_address):
address = signer_address
logg.debug('explicit address for reserve nonce {}'.format(signer_address))
else:
address = AccountRole.get_address(signer_address, session=session)
logg.debug('role for reserve nonce {} -> {}'.format(signer_address, address))
if not is_checksum_address(address):
raise ValueError('invalid result when resolving address for nonce {}'.format(address))
root_id = self.request.root_id
r = NonceReservation.next(address, root_id)
logg.debug('nonce {} reserved for address {} task {}'.format(r[1], address, r[0]))
session.commit()
session.close()
return chained_input
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_spec_dict):
"""Force update of network status of a simgle transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
"""
queue = self.request.delivery_info.get('routing_key')
chain_spec = ChainSpec.from_dict(chain_spec_dict)
conn = RPCConnection.connect(chain_spec, 'default')
o = transaction(tx_hash_hex)
tx = conn.do(o)
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx = c.w3.eth.getTransaction(tx_hash_hex)
rcpt = None
try:
o = receipt(tx_hash_hex)
rcpt = conn.do(o)
except NotFoundEthException as e:
rcpt = c.w3.eth.getTransactionReceipt(tx_hash_hex)
except web3.exceptions.TransactionNotFound as e:
pass
if rcpt != None:
@@ -530,54 +579,79 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
s.apply_async()
#
#@celery_app.task(bind=True)
#def resume_tx(self, txpending_hash_hex, chain_str):
# """Queue a suspended tranaction for (re)sending
#
# :param txpending_hash_hex: Transaction hash
# :type txpending_hash_hex: str, 0x-hex
# :param chain_str: Chain spec, string representation
# :type chain_str: str
# :raises NotLocalTxError: Transaction does not exist in the local queue
# :returns: Transaction hash
# :rtype: str, 0x-hex
# """
#
# chain_spec = ChainSpec.from_chain_str(chain_str)
#
# session = SessionBase.create_session()
# q = session.query(Otx.signed_tx)
# q = q.filter(Otx.tx_hash==txpending_hash_hex)
# r = q.first()
# session.close()
# if r == None:
# raise NotLocalTxError(txpending_hash_hex)
#
# tx_signed_raw_hex = r[0]
# tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
# tx = unpack_signed_raw_tx(tx_signed_bytes, chain_spec.chain_id())
#
# queue = self.request.delivery_info['routing_key']
#
# s = create_check_gas_and_send_task(
# [tx_signed_raw_hex],
# chain_str,
# tx['from'],
# tx['gasPrice'] * tx['gas'],
# [txpending_hash_hex],
# queue=queue,
# )
# s.apply_async()
# return txpending_hash_hex
@celery_app.task(bind=True)
def resume_tx(self, txpending_hash_hex, chain_str):
"""Queue a suspended tranaction for (re)sending
:param txpending_hash_hex: Transaction hash
:type txpending_hash_hex: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:raises NotLocalTxError: Transaction does not exist in the local queue
:returns: Transaction hash
:rtype: str, 0x-hex
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session()
q = session.query(Otx.signed_tx)
q = q.filter(Otx.tx_hash==txpending_hash_hex)
r = q.first()
session.close()
if r == None:
raise NotLocalTxError(txpending_hash_hex)
tx_signed_raw_hex = r[0]
tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_bytes, chain_spec.chain_id())
queue = self.request.delivery_info['routing_key']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
tx['from'],
tx['gasPrice'] * tx['gas'],
[txpending_hash_hex],
queue=queue,
)
s.apply_async()
return txpending_hash_hex
# TODO: Move to cic_eth.eth.gas
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gas_data(
@celery_app.task()
def otx_cache_parse_tx(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
chain_str,
):
"""Generates and commits transaction cache metadata for a gas refill transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_gas_refill_data(tx_hash_hex, tx)
return txc
@celery_app.task()
def cache_gas_refill_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_parse_tx
@@ -588,16 +662,12 @@ def cache_gas_data(
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
zero_address,
zero_address,
tx['value'],
tx['value'],
)

Some files were not shown because too many files have changed in this diff Show More