Compare commits
4 Commits
lash/sessi
...
lash/audit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
13e0fd41cd
|
||
|
|
c9a69d8658 | ||
|
|
32ce706b2d
|
||
|
|
1a31876e2e
|
@@ -5,7 +5,6 @@ include:
|
||||
- local: 'apps/cic-ussd/.gitlab-ci.yml'
|
||||
- local: 'apps/cic-notify/.gitlab-ci.yml'
|
||||
- local: 'apps/cic-meta/.gitlab-ci.yml'
|
||||
- local: 'apps/cic-cache/.gitlab-ci.yml'
|
||||
|
||||
stages:
|
||||
- build
|
||||
|
||||
3
.gitmodules
vendored
3
.gitmodules
vendored
@@ -0,0 +1,3 @@
|
||||
[submodule "apps/cic-cache"]
|
||||
path = apps/cic-cache
|
||||
url = git@gitlab.com:grassrootseconomics/cic-cache.git
|
||||
|
||||
1
apps/cic-cache
Submodule
1
apps/cic-cache
Submodule
Submodule apps/cic-cache added at d2cb3a4555
@@ -1,2 +0,0 @@
|
||||
[bancor]
|
||||
dir =
|
||||
@@ -1,2 +0,0 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
@@ -1,8 +0,0 @@
|
||||
[database]
|
||||
NAME=cic-eth
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=postgresql
|
||||
DRIVER=psycopg2
|
||||
@@ -1,6 +0,0 @@
|
||||
[eth]
|
||||
provider = ws://localhost:8545
|
||||
#ttp_provider = http://localhost:8545
|
||||
#provider = http://localhost:8545
|
||||
gas_provider_address =
|
||||
#chain_id =
|
||||
@@ -1,2 +0,0 @@
|
||||
[bancor]
|
||||
dir =
|
||||
@@ -1,2 +0,0 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
@@ -1,8 +0,0 @@
|
||||
[database]
|
||||
NAME=cic-cache-test
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=sqlite
|
||||
DRIVER=pysqlite
|
||||
@@ -1,5 +0,0 @@
|
||||
[eth]
|
||||
#ws_provider = ws://localhost:8546
|
||||
#ttp_provider = http://localhost:8545
|
||||
provider = http://localhost:8545
|
||||
#chain_id =
|
||||
@@ -1,5 +0,0 @@
|
||||
[report]
|
||||
omit =
|
||||
.venv/*
|
||||
scripts/*
|
||||
cic_cache/db/postgres/*
|
||||
@@ -1,7 +0,0 @@
|
||||
set -a
|
||||
CICTEST_DATABASE_ENGINE=postgresql
|
||||
CICTEST_DATABASE_DRIVER=psycopg2
|
||||
CICTEST_DATABASE_HOST=localhost
|
||||
CICTEST_DATABASE_PORT=5432
|
||||
CICTEST_DATABASE_NAME=cic-eth-test
|
||||
set +a
|
||||
8
apps/cic-cache/.gitignore
vendored
8
apps/cic-cache/.gitignore
vendored
@@ -1,8 +0,0 @@
|
||||
.envrc
|
||||
.envrc_dev
|
||||
.venv
|
||||
__pycache__
|
||||
*.pyc
|
||||
_build
|
||||
doc/**/*.png
|
||||
doc/**/html
|
||||
@@ -1,22 +0,0 @@
|
||||
.cic_cache_variables:
|
||||
variables:
|
||||
APP_NAME: cic-cache
|
||||
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
|
||||
|
||||
.cic_cache_changes_target:
|
||||
rules:
|
||||
- changes:
|
||||
- $CONTEXT/$APP_NAME/*
|
||||
|
||||
build-mr-cic-cache:
|
||||
extends:
|
||||
- .cic_cache_changes_target
|
||||
- .py_build_merge_request
|
||||
- .cic_cache_variables
|
||||
|
||||
build-push-cic-cache:
|
||||
extends:
|
||||
- .py_build_push
|
||||
- .cic_cache_variables
|
||||
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
- 0.1.2
|
||||
* Revert to alembic migrations
|
||||
- 0.1.1
|
||||
* Add missing modules to setup
|
||||
- 0.1.0
|
||||
* Remove old APIs
|
||||
* Add bloom filter output APIs for all txs and per-account txs
|
||||
- 0.0.2
|
||||
* UWSGI server endpoint example
|
||||
* OpenAPI spec
|
||||
* stored procedures, test fixture for database schema
|
||||
- 0.0.1
|
||||
* Add json translators of transaction_list and balances stored procedure queries
|
||||
@@ -1 +0,0 @@
|
||||
from .cache import BloomCache
|
||||
@@ -1,73 +0,0 @@
|
||||
"""API for cic-cache celery tasks
|
||||
|
||||
.. moduleauthor:: Louis Holbrook <dev@holbrook.no>
|
||||
|
||||
"""
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
|
||||
|
||||
app = celery.current_app
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Api:
|
||||
"""Creates task chains to perform well-known CIC operations.
|
||||
|
||||
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
|
||||
|
||||
:param callback_param: Static value to pass to callback
|
||||
:type callback_param: str
|
||||
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
|
||||
:type callback_task: string
|
||||
:param queue: Name of worker queue to submit tasks to
|
||||
:type queue: str
|
||||
"""
|
||||
def __init__(self, queue='cic-cache', callback_param=None, callback_task='cic_cache.callbacks.noop.noop', callback_queue=None):
|
||||
self.callback_param = callback_param
|
||||
self.callback_task = callback_task
|
||||
self.queue = queue
|
||||
logg.info('api using queue {}'.format(self.queue))
|
||||
self.callback_success = None
|
||||
self.callback_error = None
|
||||
if callback_queue == None:
|
||||
callback_queue=self.queue
|
||||
|
||||
if callback_param != None:
|
||||
self.callback_success = celery.signature(
|
||||
callback_task,
|
||||
[
|
||||
callback_param,
|
||||
0,
|
||||
],
|
||||
queue=callback_queue,
|
||||
)
|
||||
self.callback_error = celery.signature(
|
||||
callback_task,
|
||||
[
|
||||
callback_param,
|
||||
1,
|
||||
],
|
||||
queue=callback_queue,
|
||||
)
|
||||
|
||||
def list(self, offset, limit, address=None):
|
||||
s = celery.signature(
|
||||
'cic_cache.tasks.tx.tx_filter',
|
||||
[
|
||||
0,
|
||||
100,
|
||||
address,
|
||||
],
|
||||
queue=None
|
||||
)
|
||||
if self.callback_param != None:
|
||||
s.link(self.callback_success).on_error(self.callback_error)
|
||||
|
||||
t = s.apply_async()
|
||||
|
||||
return t
|
||||
@@ -1,89 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import moolb
|
||||
|
||||
# local imports
|
||||
from cic_cache.db import list_transactions_mined
|
||||
from cic_cache.db import list_transactions_account_mined
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class BloomCache:
|
||||
|
||||
def __init__(self, session):
|
||||
self.session = session
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get_filter_size(n):
|
||||
n = 8192 * 8
|
||||
logg.warning('filter size hardcoded to {}'.format(n))
|
||||
return n
|
||||
|
||||
|
||||
def load_transactions(self, offset, limit):
|
||||
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
|
||||
|
||||
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
|
||||
|
||||
For example, if the block number is 13 and the transaction index is 42, the input are:
|
||||
|
||||
block filter: 0x0d000000
|
||||
block+tx filter: 0x0d0000002a0000000
|
||||
|
||||
:param offset: Offset in data set to return transactions from
|
||||
:type offset: int
|
||||
:param limit: Max number of transactions to retrieve
|
||||
:type limit: int
|
||||
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
|
||||
:rtype: tuple
|
||||
"""
|
||||
rows = list_transactions_mined(self.session, offset, limit)
|
||||
|
||||
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
|
||||
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
|
||||
highest_block = -1
|
||||
lowest_block = -1
|
||||
for r in rows:
|
||||
if highest_block == -1:
|
||||
highest_block = r[0]
|
||||
lowest_block = r[0]
|
||||
block = r[0].to_bytes(4, byteorder='big')
|
||||
tx = r[1].to_bytes(4, byteorder='big')
|
||||
f_block.add(block)
|
||||
f_blocktx.add(block + tx)
|
||||
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
|
||||
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
|
||||
|
||||
|
||||
def load_transactions_account(self, address, offset, limit):
|
||||
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
|
||||
|
||||
:param address: Address to retrieve transactions for.
|
||||
:type address: str, 0x-hex
|
||||
:param offset: Offset in data set to return transactions from
|
||||
:type offset: int
|
||||
:param limit: Max number of transactions to retrieve
|
||||
:type limit: int
|
||||
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
|
||||
:rtype: tuple
|
||||
"""
|
||||
rows = list_transactions_account_mined(self.session, address, offset, limit)
|
||||
|
||||
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
|
||||
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
|
||||
highest_block = -1;
|
||||
lowest_block = -1;
|
||||
for r in rows:
|
||||
if highest_block == -1:
|
||||
highest_block = r[0]
|
||||
lowest_block = r[0]
|
||||
block = r[0].to_bytes(4, byteorder='big')
|
||||
tx = r[1].to_bytes(4, byteorder='big')
|
||||
f_block.add(block)
|
||||
f_blocktx.add(block + tx)
|
||||
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
|
||||
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
|
||||
@@ -1,35 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from .list import list_transactions_mined
|
||||
from .list import list_transactions_account_mined
|
||||
from .list import add_transaction
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def dsn_from_config(config):
|
||||
scheme = config.get('DATABASE_ENGINE')
|
||||
if config.get('DATABASE_DRIVER') != None:
|
||||
scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
|
||||
|
||||
dsn = ''
|
||||
if config.get('DATABASE_ENGINE') == 'sqlite':
|
||||
dsn = '{}:///{}'.format(
|
||||
scheme,
|
||||
config.get('DATABASE_NAME'),
|
||||
)
|
||||
|
||||
else:
|
||||
dsn = '{}://{}:{}@{}:{}/{}'.format(
|
||||
scheme,
|
||||
config.get('DATABASE_USER'),
|
||||
config.get('DATABASE_PASSWORD'),
|
||||
config.get('DATABASE_HOST'),
|
||||
config.get('DATABASE_PORT'),
|
||||
config.get('DATABASE_NAME'),
|
||||
)
|
||||
logg.debug('parsed dsn from config: {}'.format(dsn))
|
||||
return dsn
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
# third-party imports
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def list_transactions_mined(
|
||||
session,
|
||||
offset,
|
||||
limit,
|
||||
):
|
||||
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
|
||||
|
||||
:param offset: Offset in data set to return transactions from
|
||||
:type offset: int
|
||||
:param limit: Max number of transactions to retrieve
|
||||
:type limit: int
|
||||
:result: Result set
|
||||
:rtype: SQLAlchemy.ResultProxy
|
||||
"""
|
||||
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
|
||||
r = session.execute(s)
|
||||
return r
|
||||
|
||||
|
||||
def list_transactions_account_mined(
|
||||
session,
|
||||
address,
|
||||
offset,
|
||||
limit,
|
||||
):
|
||||
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
|
||||
|
||||
:param address: Address to retrieve transactions for.
|
||||
:type address: str, 0x-hex
|
||||
:param offset: Offset in data set to return transactions from
|
||||
:type offset: int
|
||||
:param limit: Max number of transactions to retrieve
|
||||
:type limit: int
|
||||
:result: Result set
|
||||
:rtype: SQLAlchemy.ResultProxy
|
||||
"""
|
||||
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
|
||||
r = session.execute(s)
|
||||
return r
|
||||
|
||||
|
||||
def add_transaction(
|
||||
session, tx_hash,
|
||||
block_number,
|
||||
tx_index,
|
||||
sender,
|
||||
receiver,
|
||||
source_token,
|
||||
destination_token,
|
||||
from_value,
|
||||
to_value,
|
||||
success,
|
||||
timestamp,
|
||||
):
|
||||
date_block = datetime.datetime.fromtimestamp(timestamp)
|
||||
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
|
||||
tx_hash,
|
||||
block_number,
|
||||
tx_index,
|
||||
sender,
|
||||
receiver,
|
||||
source_token,
|
||||
destination_token,
|
||||
from_value,
|
||||
to_value,
|
||||
success,
|
||||
date_block,
|
||||
)
|
||||
session.execute(s)
|
||||
@@ -1 +0,0 @@
|
||||
Generic single-database configuration.
|
||||
@@ -1,86 +0,0 @@
|
||||
# A generic, single database configuration.
|
||||
|
||||
[alembic]
|
||||
# path to migration scripts
|
||||
script_location = .
|
||||
|
||||
# template used to generate migration files
|
||||
# file_template = %%(rev)s_%%(slug)s
|
||||
|
||||
# timezone to use when rendering the date
|
||||
# within the migration file as well as the filename.
|
||||
# string value is passed to dateutil.tz.gettz()
|
||||
# leave blank for localtime
|
||||
# timezone =
|
||||
|
||||
# max length of characters to apply to the
|
||||
# "slug" field
|
||||
# truncate_slug_length = 40
|
||||
|
||||
# set to 'true' to run the environment during
|
||||
# the 'revision' command, regardless of autogenerate
|
||||
# revision_environment = false
|
||||
|
||||
# set to 'true' to allow .pyc and .pyo files without
|
||||
# a source .py file to be detected as revisions in the
|
||||
# versions/ directory
|
||||
# sourceless = false
|
||||
|
||||
# version location specification; this defaults
|
||||
# to ./versions. When using multiple version
|
||||
# directories, initial revisions must be specified with --version-path
|
||||
# version_locations = %(here)s/bar %(here)s/bat ./versions
|
||||
|
||||
# the output encoding used when revision files
|
||||
# are written from script.py.mako
|
||||
# output_encoding = utf-8
|
||||
|
||||
#sqlalchemy.url = driver://user:pass@localhost/dbname
|
||||
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-cache
|
||||
|
||||
|
||||
[post_write_hooks]
|
||||
# post_write_hooks defines scripts or Python functions that are run
|
||||
# on newly generated revision scripts. See the documentation for further
|
||||
# detail and examples
|
||||
|
||||
# format using "black" - use the console_scripts runner, against the "black" entrypoint
|
||||
# hooks=black
|
||||
# black.type=console_scripts
|
||||
# black.entrypoint=black
|
||||
# black.options=-l 79
|
||||
|
||||
# Logging configuration
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
level = WARN
|
||||
handlers = console
|
||||
qualname =
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = WARN
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
|
||||
[logger_alembic]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = alembic
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
datefmt = %H:%M:%S
|
||||
@@ -1,77 +0,0 @@
|
||||
from logging.config import fileConfig
|
||||
|
||||
from sqlalchemy import engine_from_config
|
||||
from sqlalchemy import pool
|
||||
|
||||
from alembic import context
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
# access to the values within the .ini file in use.
|
||||
config = context.config
|
||||
|
||||
# Interpret the config file for Python logging.
|
||||
# This line sets up loggers basically.
|
||||
fileConfig(config.config_file_name)
|
||||
|
||||
# add your model's MetaData object here
|
||||
# for 'autogenerate' support
|
||||
# from myapp import mymodel
|
||||
# target_metadata = mymodel.Base.metadata
|
||||
target_metadata = None
|
||||
|
||||
# other values from the config, defined by the needs of env.py,
|
||||
# can be acquired:
|
||||
# my_important_option = config.get_main_option("my_important_option")
|
||||
# ... etc.
|
||||
|
||||
|
||||
def run_migrations_offline():
|
||||
"""Run migrations in 'offline' mode.
|
||||
|
||||
This configures the context with just a URL
|
||||
and not an Engine, though an Engine is acceptable
|
||||
here as well. By skipping the Engine creation
|
||||
we don't even need a DBAPI to be available.
|
||||
|
||||
Calls to context.execute() here emit the given string to the
|
||||
script output.
|
||||
|
||||
"""
|
||||
url = config.get_main_option("sqlalchemy.url")
|
||||
context.configure(
|
||||
url=url,
|
||||
target_metadata=target_metadata,
|
||||
literal_binds=True,
|
||||
dialect_opts={"paramstyle": "named"},
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
def run_migrations_online():
|
||||
"""Run migrations in 'online' mode.
|
||||
|
||||
In this scenario we need to create an Engine
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
connectable = engine_from_config(
|
||||
config.get_section(config.config_ini_section),
|
||||
prefix="sqlalchemy.",
|
||||
poolclass=pool.NullPool,
|
||||
)
|
||||
|
||||
with connectable.connect() as connection:
|
||||
context.configure(
|
||||
connection=connection, target_metadata=target_metadata
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
||||
@@ -1,24 +0,0 @@
|
||||
"""${message}
|
||||
|
||||
Revision ID: ${up_revision}
|
||||
Revises: ${down_revision | comma,n}
|
||||
Create Date: ${create_date}
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
${imports if imports else ""}
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = ${repr(up_revision)}
|
||||
down_revision = ${repr(down_revision)}
|
||||
branch_labels = ${repr(branch_labels)}
|
||||
depends_on = ${repr(depends_on)}
|
||||
|
||||
|
||||
def upgrade():
|
||||
${upgrades if upgrades else "pass"}
|
||||
|
||||
|
||||
def downgrade():
|
||||
${downgrades if downgrades else "pass"}
|
||||
@@ -1,52 +0,0 @@
|
||||
"""Base tables
|
||||
|
||||
Revision ID: 63b629f14a85
|
||||
Revises:
|
||||
Create Date: 2020-12-04 08:16:00.412189
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '63b629f14a85'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'tx',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('date_registered', sa.DateTime, nullable=False, server_default=sa.func.current_timestamp()),
|
||||
sa.Column('block_number', sa.Integer, nullable=False),
|
||||
sa.Column('tx_index', sa.Integer, nullable=False),
|
||||
sa.Column('tx_hash', sa.String(66), nullable=False),
|
||||
sa.Column('sender', sa.String(42), nullable=False),
|
||||
sa.Column('recipient', sa.String(42), nullable=False),
|
||||
sa.Column('source_token', sa.String(42), nullable=False),
|
||||
sa.Column('destination_token', sa.String(42), nullable=False),
|
||||
sa.Column('success', sa.Boolean, nullable=False),
|
||||
sa.Column('from_value', sa.BIGINT(), nullable=False),
|
||||
sa.Column('to_value', sa.BIGINT(), nullable=False),
|
||||
sa.Column('date_block', sa.DateTime, nullable=False),
|
||||
)
|
||||
op.create_table(
|
||||
'tx_sync',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('tx', sa.String(66), nullable=False),
|
||||
)
|
||||
|
||||
op.execute("INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');")
|
||||
|
||||
op.create_index('sender_token_idx', 'tx', ['sender', 'source_token'])
|
||||
op.create_index('recipient_token_idx', 'tx', ['recipient', 'destination_token'])
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_index('recipient_token_idx')
|
||||
op.drop_index('sender_token_idx')
|
||||
op.drop_table('tx_sync')
|
||||
op.drop_table('tx')
|
||||
@@ -1,102 +0,0 @@
|
||||
# stanard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, Integer
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
Model = declarative_base(name='Model')
|
||||
|
||||
|
||||
class SessionBase(Model):
|
||||
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
|
||||
"""
|
||||
__abstract__ = True
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
engine = None
|
||||
"""Database connection engine of the running aplication"""
|
||||
sessionmaker = None
|
||||
"""Factory object responsible for creating sessions from the connection pool"""
|
||||
transactional = True
|
||||
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
|
||||
poolable = True
|
||||
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
|
||||
procedural = True
|
||||
"""Whether the database backend supports stored procedures"""
|
||||
localsessions = {}
|
||||
"""Contains dictionary of sessions initiated by db model components"""
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create_session():
|
||||
"""Creates a new database session.
|
||||
"""
|
||||
return SessionBase.sessionmaker()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _set_engine(engine):
|
||||
"""Sets the database engine static property
|
||||
"""
|
||||
SessionBase.engine = engine
|
||||
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def connect(dsn, debug=False):
|
||||
"""Create new database connection engine and connect to database backend.
|
||||
|
||||
:param dsn: DSN string defining connection.
|
||||
:type dsn: str
|
||||
"""
|
||||
e = None
|
||||
if SessionBase.poolable:
|
||||
e = create_engine(
|
||||
dsn,
|
||||
max_overflow=50,
|
||||
pool_pre_ping=True,
|
||||
pool_size=20,
|
||||
pool_recycle=10,
|
||||
echo=debug,
|
||||
)
|
||||
else:
|
||||
e = create_engine(
|
||||
dsn,
|
||||
echo=debug,
|
||||
)
|
||||
|
||||
SessionBase._set_engine(e)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def disconnect():
|
||||
"""Disconnect from database and free resources.
|
||||
"""
|
||||
SessionBase.engine.dispose()
|
||||
SessionBase.engine = None
|
||||
|
||||
|
||||
@staticmethod
|
||||
def bind_session(session=None):
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
localsession_key = str(id(localsession))
|
||||
logg.debug('creating new session {}'.format(localsession_key))
|
||||
SessionBase.localsessions[localsession_key] = localsession
|
||||
return localsession
|
||||
|
||||
|
||||
@staticmethod
|
||||
def release_session(session=None):
|
||||
session_key = str(id(session))
|
||||
if SessionBase.localsessions.get(session_key) != None:
|
||||
logg.debug('destroying session {}'.format(session_key))
|
||||
session.commit()
|
||||
session.close()
|
||||
@@ -1,141 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
import argparse
|
||||
import json
|
||||
import base64
|
||||
|
||||
# third-party imports
|
||||
import confini
|
||||
|
||||
# local imports
|
||||
from cic_cache import BloomCache
|
||||
from cic_cache.db import dsn_from_config
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
|
||||
migrationsdir = os.path.join(dbdir, 'migrations')
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-cache')
|
||||
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('-v', action='store_true', help='be verbose')
|
||||
argparser.add_argument('-vv', action='store_true', help='be more verbose')
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.vv:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
elif args.v:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
config = confini.Config(args.c, args.env_prefix)
|
||||
config.process()
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config:\n{}'.format(config))
|
||||
|
||||
dsn = dsn_from_config(config)
|
||||
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
|
||||
|
||||
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
|
||||
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
|
||||
|
||||
DEFAULT_LIMIT = 100
|
||||
|
||||
|
||||
def process_transactions_account_bloom(session, env):
|
||||
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
|
||||
if not r:
|
||||
return None
|
||||
|
||||
address = r[1]
|
||||
if r[2] == None:
|
||||
address = '0x' + address
|
||||
offset = DEFAULT_LIMIT
|
||||
if r.lastindex > 2:
|
||||
offset = r[3]
|
||||
limit = 0
|
||||
if r.lastindex > 3:
|
||||
limit = r[4]
|
||||
|
||||
c = BloomCache(session)
|
||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
|
||||
|
||||
o = {
|
||||
'alg': 'sha256',
|
||||
'low': lowest_block,
|
||||
'high': highest_block,
|
||||
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
|
||||
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
|
||||
'filter_rounds': 3,
|
||||
}
|
||||
|
||||
j = json.dumps(o)
|
||||
|
||||
return ('application/json', j.encode('utf-8'),)
|
||||
|
||||
|
||||
def process_transactions_all_bloom(session, env):
|
||||
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
|
||||
if not r:
|
||||
return None
|
||||
|
||||
offset = DEFAULT_LIMIT
|
||||
if r.lastindex > 0:
|
||||
offset = r[1]
|
||||
limit = 0
|
||||
if r.lastindex > 1:
|
||||
limit = r[2]
|
||||
|
||||
c = BloomCache(session)
|
||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
|
||||
|
||||
o = {
|
||||
'alg': 'sha256',
|
||||
'low': lowest_block,
|
||||
'high': highest_block,
|
||||
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
|
||||
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
|
||||
'filter_rounds': 3,
|
||||
}
|
||||
|
||||
j = json.dumps(o)
|
||||
|
||||
return ('application/json', j.encode('utf-8'),)
|
||||
|
||||
|
||||
# uwsgi application
|
||||
def application(env, start_response):
|
||||
|
||||
headers = []
|
||||
content = b''
|
||||
|
||||
session = SessionBase.create_session()
|
||||
for handler in [
|
||||
process_transactions_all_bloom,
|
||||
process_transactions_account_bloom,
|
||||
]:
|
||||
r = handler(session, env)
|
||||
if r != None:
|
||||
(mime_type, content) = r
|
||||
break
|
||||
session.close()
|
||||
|
||||
headers.append(('Content-Length', str(len(content))),)
|
||||
headers.append(('Access-Control-Allow-Origin', '*',));
|
||||
|
||||
if len(content) == 0:
|
||||
headers.append(('Content-Type', 'text/plain, charset=UTF-8',))
|
||||
start_response('404 Looked everywhere, sorry', headers)
|
||||
else:
|
||||
headers.append(('Content-Type', mime_type,))
|
||||
start_response('200 OK', headers)
|
||||
|
||||
return [content]
|
||||
@@ -1,98 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
import confini
|
||||
|
||||
# local imports
|
||||
from cic_cache.db import dsn_from_config
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
from cic_cache.tasks.tx import *
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-cache')
|
||||
|
||||
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
|
||||
argparser.add_argument('-q', type=str, default='cic-cache', help='queue name for worker tasks')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('-v', action='store_true', help='be verbose')
|
||||
argparser.add_argument('-vv', action='store_true', help='be more verbose')
|
||||
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.vv:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
elif args.v:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
config = confini.Config(args.c, args.env_prefix)
|
||||
config.process()
|
||||
|
||||
# connect to database
|
||||
dsn = dsn_from_config(config)
|
||||
SessionBase.connect(dsn)
|
||||
|
||||
# verify database connection with minimal sanity query
|
||||
#session = SessionBase.create_session()
|
||||
#session.execute('select version_num from alembic_version')
|
||||
#session.close()
|
||||
|
||||
# set up celery
|
||||
current_app = celery.Celery(__name__)
|
||||
|
||||
broker = config.get('CELERY_BROKER_URL')
|
||||
if broker[:4] == 'file':
|
||||
bq = tempfile.mkdtemp()
|
||||
bp = tempfile.mkdtemp()
|
||||
current_app.conf.update({
|
||||
'broker_url': broker,
|
||||
'broker_transport_options': {
|
||||
'data_folder_in': bq,
|
||||
'data_folder_out': bq,
|
||||
'data_folder_processed': bp,
|
||||
},
|
||||
},
|
||||
)
|
||||
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
|
||||
else:
|
||||
current_app.conf.update({
|
||||
'broker_url': broker,
|
||||
})
|
||||
|
||||
result = config.get('CELERY_RESULT_URL')
|
||||
if result[:4] == 'file':
|
||||
rq = tempfile.mkdtemp()
|
||||
current_app.conf.update({
|
||||
'result_backend': 'file://{}'.format(rq),
|
||||
})
|
||||
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
|
||||
else:
|
||||
current_app.conf.update({
|
||||
'result_backend': result,
|
||||
})
|
||||
|
||||
|
||||
def main():
|
||||
argv = ['worker']
|
||||
if args.vv:
|
||||
argv.append('--loglevel=DEBUG')
|
||||
elif args.v:
|
||||
argv.append('--loglevel=INFO')
|
||||
argv.append('-Q')
|
||||
argv.append(args.q)
|
||||
argv.append('-n')
|
||||
argv.append(args.q)
|
||||
|
||||
current_app.worker_main(argv)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,339 +0,0 @@
|
||||
# standard imports
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
import logging
|
||||
import time
|
||||
import enum
|
||||
import re
|
||||
|
||||
# third-party imports
|
||||
import confini
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry.chain import (
|
||||
ChainRegistry,
|
||||
ChainSpec,
|
||||
)
|
||||
#from cic_registry.bancor import BancorRegistryClient
|
||||
from cic_registry.token import Token
|
||||
from cic_registry.error import (
|
||||
UnknownContractError,
|
||||
UnknownDeclarationError,
|
||||
)
|
||||
from cic_registry.declaration import to_token_declaration
|
||||
from web3.exceptions import BlockNotFound, TransactionNotFound
|
||||
from websockets.exceptions import ConnectionClosedError
|
||||
from requests.exceptions import ConnectionError
|
||||
import web3
|
||||
from web3 import HTTPProvider, WebsocketProvider
|
||||
|
||||
# local imports
|
||||
from cic_cache import db
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
|
||||
|
||||
log_topics = {
|
||||
'transfer': '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
|
||||
'convert': '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095',
|
||||
'accountregistry_add': '0a3b0a4f4c6e53dce3dbcad5614cb2ba3a0fa7326d03c5d64b4fa2d565492737',
|
||||
}
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-cache')
|
||||
|
||||
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
|
||||
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
|
||||
argparser.add_argument('--trust-address', default=[], type=str, dest='trust_address', action='append', help='Set address as trust')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
|
||||
argparser.add_argument('-v', help='be verbose', action='store_true')
|
||||
argparser.add_argument('-vv', help='be more verbose', action='store_true')
|
||||
args = argparser.parse_args(sys.argv[1:])
|
||||
|
||||
config_dir = os.path.join(args.c)
|
||||
os.makedirs(config_dir, 0o777, True)
|
||||
|
||||
|
||||
if args.v == True:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
elif args.vv == True:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
config = confini.Config(config_dir, args.env_prefix)
|
||||
config.process()
|
||||
args_override = {
|
||||
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
|
||||
'CIC_TRUST_ADDRESS': ",".join(getattr(args, 'trust_address', [])),
|
||||
}
|
||||
config.dict_override(args_override, 'cli flag')
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
||||
|
||||
# connect to database
|
||||
dsn = db.dsn_from_config(config)
|
||||
SessionBase.connect(dsn)
|
||||
|
||||
|
||||
re_websocket = re.compile('^wss?://')
|
||||
re_http = re.compile('^https?://')
|
||||
blockchain_provider = config.get('ETH_PROVIDER')
|
||||
if re.match(re_websocket, blockchain_provider) != None:
|
||||
blockchain_provider = WebsocketProvider(blockchain_provider)
|
||||
elif re.match(re_http, blockchain_provider) != None:
|
||||
blockchain_provider = HTTPProvider(blockchain_provider)
|
||||
else:
|
||||
raise ValueError('unknown provider url {}'.format(blockchain_provider))
|
||||
|
||||
def web3_constructor():
|
||||
w3 = web3.Web3(blockchain_provider)
|
||||
return (blockchain_provider, w3)
|
||||
|
||||
|
||||
class RunStateEnum(enum.IntEnum):
|
||||
INIT = 0
|
||||
RUN = 1
|
||||
TERMINATE = 9
|
||||
|
||||
|
||||
def rubberstamp(src):
|
||||
return True
|
||||
|
||||
|
||||
class Tracker:
|
||||
|
||||
def __init__(self, chain_spec, trusts=[]):
|
||||
self.block_height = 0
|
||||
self.tx_height = 0
|
||||
self.state = RunStateEnum.INIT
|
||||
self.declarator_cache = {}
|
||||
self.convert_enabled = False
|
||||
self.trusts = trusts
|
||||
self.chain_spec = chain_spec
|
||||
self.declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
|
||||
|
||||
|
||||
def __process_tx(self, w3, session, t, r, l, b):
|
||||
token_value = int(l.data, 16)
|
||||
token_sender = l.topics[1][-20:].hex()
|
||||
token_recipient = l.topics[2][-20:].hex()
|
||||
|
||||
#ts = ContractRegistry.get_address(t.address)
|
||||
ts = CICRegistry.get_address(self.chain_spec, t.address())
|
||||
logg.info('add token transfer {} value {} from {} to {}'.format(
|
||||
ts.symbol(),
|
||||
token_value,
|
||||
token_sender,
|
||||
token_recipient,
|
||||
)
|
||||
)
|
||||
|
||||
db.add_transaction(
|
||||
session,
|
||||
r.transactionHash.hex(),
|
||||
r.blockNumber,
|
||||
r.transactionIndex,
|
||||
w3.toChecksumAddress(token_sender),
|
||||
w3.toChecksumAddress(token_recipient),
|
||||
t.address(),
|
||||
t.address(),
|
||||
token_value,
|
||||
token_value,
|
||||
r.status == 1,
|
||||
b.timestamp,
|
||||
)
|
||||
session.flush()
|
||||
|
||||
|
||||
# TODO: simplify/ split up and/or comment, function is too long
|
||||
def __process_convert(self, w3, session, t, r, l, b):
|
||||
logg.warning('conversions are deactivated')
|
||||
return
|
||||
# token_source = l.topics[2][-20:].hex()
|
||||
# token_source = w3.toChecksumAddress(token_source)
|
||||
# token_destination = l.topics[3][-20:].hex()
|
||||
# token_destination = w3.toChecksumAddress(token_destination)
|
||||
# data_noox = l.data[2:]
|
||||
# d = data_noox[:64]
|
||||
# token_from_value = int(d, 16)
|
||||
# d = data_noox[64:128]
|
||||
# token_to_value = int(d, 16)
|
||||
# token_trader = '0x' + data_noox[192-40:]
|
||||
#
|
||||
# #ts = ContractRegistry.get_address(token_source)
|
||||
# ts = CICRegistry.get_address(CICRegistry.bancor_chain_spec, t.address())
|
||||
# #if ts == None:
|
||||
# # ts = ContractRegistry.reserves[token_source]
|
||||
# td = ContractRegistry.get_address(token_destination)
|
||||
# #if td == None:
|
||||
# # td = ContractRegistry.reserves[token_source]
|
||||
# logg.info('add token convert {} -> {} value {} -> {} trader {}'.format(
|
||||
# ts.symbol(),
|
||||
# td.symbol(),
|
||||
# token_from_value,
|
||||
# token_to_value,
|
||||
# token_trader,
|
||||
# )
|
||||
# )
|
||||
#
|
||||
# db.add_transaction(
|
||||
# session,
|
||||
# r.transactionHash.hex(),
|
||||
# r.blockNumber,
|
||||
# r.transactionIndex,
|
||||
# w3.toChecksumAddress(token_trader),
|
||||
# w3.toChecksumAddress(token_trader),
|
||||
# token_source,
|
||||
# token_destination,
|
||||
# r.status == 1,
|
||||
# b.timestamp,
|
||||
# )
|
||||
# session.flush()
|
||||
|
||||
|
||||
def check_token(self, address):
|
||||
t = None
|
||||
try:
|
||||
t = CICRegistry.get_address(CICRegistry.default_chain_spec, address)
|
||||
return t
|
||||
except UnknownContractError:
|
||||
logg.debug('contract {} not in registry'.format(address))
|
||||
|
||||
# If nothing was returned, we look up the token in the declarator
|
||||
for trust in self.trusts:
|
||||
logg.debug('look up declaration for contract {} with trust {}'.format(address, trust))
|
||||
fn = self.declarator.function('declaration')
|
||||
# TODO: cache trust in LRUcache
|
||||
declaration_array = fn(trust, address).call()
|
||||
try:
|
||||
declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp])
|
||||
logg.debug('found declaration for token {} from trust address {}'.format(address, trust))
|
||||
except UnknownDeclarationError:
|
||||
continue
|
||||
|
||||
try:
|
||||
c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address)
|
||||
t = CICRegistry.add_token(self.chain_spec, c)
|
||||
break
|
||||
except ValueError:
|
||||
logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address))
|
||||
|
||||
return t
|
||||
|
||||
|
||||
# TODO use input data instead of logs
|
||||
def process(self, w3, session, block):
|
||||
#self.refresh_registry(w3)
|
||||
tx_count = w3.eth.getBlockTransactionCount(block.hash)
|
||||
b = w3.eth.getBlock(block.hash)
|
||||
for i in range(self.tx_height, tx_count):
|
||||
tx = w3.eth.getTransactionByBlock(block.hash, i)
|
||||
if tx.to == None:
|
||||
logg.debug('block {} tx {} is contract creation tx, skipping'.format(block.number, i))
|
||||
continue
|
||||
if len(w3.eth.getCode(tx.to)) == 0:
|
||||
logg.debug('block {} tx {} not a contract tx, skipping'.format(block.number, i))
|
||||
continue
|
||||
|
||||
t = self.check_token(tx.to)
|
||||
if t != None and isinstance(t, Token):
|
||||
r = w3.eth.getTransactionReceipt(tx.hash)
|
||||
for l in r.logs:
|
||||
logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
|
||||
if l.topics[0].hex() == log_topics['transfer']:
|
||||
self.__process_tx(w3, session, t, r, l, b)
|
||||
|
||||
# TODO: cache contracts in LRUcache
|
||||
elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address:
|
||||
r = w3.eth.getTransactionReceipt(tx.hash)
|
||||
for l in r.logs:
|
||||
logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
|
||||
if l.topics[0].hex() == log_topics['convert']:
|
||||
self.__process_convert(w3, session, t, r, l, b)
|
||||
|
||||
session.execute("UPDATE tx_sync SET tx = '{}'".format(tx.hash.hex()))
|
||||
session.commit()
|
||||
self.tx_height += 1
|
||||
|
||||
|
||||
def __get_next_retry(self, backoff=False):
|
||||
return 1
|
||||
|
||||
|
||||
def loop(self):
|
||||
logg.info('starting at block {} tx index {}'.format(self.block_height, self.tx_height))
|
||||
self.state = RunStateEnum.RUN
|
||||
while self.state == RunStateEnum.RUN:
|
||||
(provider, w3) = web3_constructor()
|
||||
session = SessionBase.create_session()
|
||||
try:
|
||||
block = w3.eth.getBlock(self.block_height)
|
||||
self.process(w3, session, block)
|
||||
self.block_height += 1
|
||||
self.tx_height = 0
|
||||
except BlockNotFound as e:
|
||||
logg.debug('no block {} yet, zZzZ...'.format(self.block_height))
|
||||
time.sleep(self.__get_next_retry())
|
||||
except ConnectionClosedError as e:
|
||||
logg.info('connection gone, retrying')
|
||||
time.sleep(self.__get_next_retry(True))
|
||||
except OSError as e:
|
||||
logg.error('cannot connect {}'.format(e))
|
||||
time.sleep(self.__get_next_retry(True))
|
||||
except Exception as e:
|
||||
session.close()
|
||||
raise(e)
|
||||
session.close()
|
||||
|
||||
|
||||
def load(self, w3):
|
||||
session = SessionBase.create_session()
|
||||
r = session.execute('SELECT tx FROM tx_sync').first()
|
||||
if r != None:
|
||||
if r[0] == '0x{0:0{1}X}'.format(0, 64):
|
||||
logg.debug('last tx was zero-address, starting from scratch')
|
||||
return
|
||||
t = w3.eth.getTransaction(r[0])
|
||||
|
||||
self.block_height = t.blockNumber
|
||||
self.tx_height = t.transactionIndex+1
|
||||
c = w3.eth.getBlockTransactionCount(t.blockHash.hex())
|
||||
logg.debug('last tx processed {} index {} (max index {})'.format(t.blockNumber, t.transactionIndex, c-1))
|
||||
if c == self.tx_height:
|
||||
self.block_height += 1
|
||||
self.tx_height = 0
|
||||
session.close()
|
||||
|
||||
(provider, w3) = web3_constructor()
|
||||
trust = config.get('CIC_TRUST_ADDRESS', "").split(",")
|
||||
chain_spec = args.i
|
||||
|
||||
try:
|
||||
w3.eth.chainId
|
||||
except Exception as e:
|
||||
logg.exception(e)
|
||||
sys.stderr.write('cannot connect to evm node\n')
|
||||
sys.exit(1)
|
||||
|
||||
def main():
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
|
||||
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
|
||||
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
|
||||
chain_registry = ChainRegistry(chain_spec)
|
||||
CICRegistry.add_chain_registry(chain_registry)
|
||||
|
||||
t = Tracker(chain_spec, trust)
|
||||
t.load(w3)
|
||||
t.loop()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,38 +0,0 @@
|
||||
# third-party imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_cache.cache import BloomCache
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def tx_filter(self, offset, limit, address=None, encoding='hex'):
|
||||
queue = self.request.delivery_info.get('routing_key')
|
||||
|
||||
session = SessionBase.create_session()
|
||||
|
||||
c = BloomCache(session)
|
||||
b = None
|
||||
if address == None:
|
||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
|
||||
else:
|
||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
|
||||
|
||||
session.close()
|
||||
|
||||
o = {
|
||||
'alg': 'sha256',
|
||||
'low': lowest_block,
|
||||
'high': highest_block,
|
||||
'block_filter': bloom_filter_block.hex(),
|
||||
'blocktx_filter': bloom_filter_tx.hex(),
|
||||
'filter_rounds': 3,
|
||||
}
|
||||
|
||||
return o
|
||||
|
||||
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
import os
|
||||
import semver
|
||||
|
||||
version = (
|
||||
0,
|
||||
2,
|
||||
0,
|
||||
'alpha.1',
|
||||
)
|
||||
|
||||
version_object = semver.VersionInfo(
|
||||
major=version[0],
|
||||
minor=version[1],
|
||||
patch=version[2],
|
||||
prerelease=version[3],
|
||||
)
|
||||
|
||||
version_string = str(version_object)
|
||||
@@ -1,2 +0,0 @@
|
||||
[bancor]
|
||||
dir =
|
||||
@@ -1,3 +0,0 @@
|
||||
[celery]
|
||||
broker_url = redis:///
|
||||
result_url = redis:///
|
||||
@@ -1,4 +0,0 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
chain_spec =
|
||||
trust_address =
|
||||
@@ -1,9 +0,0 @@
|
||||
[database]
|
||||
NAME=cic-eth
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=postgresql
|
||||
DRIVER=psycopg2
|
||||
DEBUG=
|
||||
@@ -1,3 +0,0 @@
|
||||
[bancor]
|
||||
registry_address =
|
||||
dir = /usr/local/share/bancor
|
||||
@@ -1,3 +0,0 @@
|
||||
[celery]
|
||||
broker_url = redis://localhost:63379
|
||||
result_url = redis://localhost:63379
|
||||
@@ -1,9 +0,0 @@
|
||||
[database]
|
||||
NAME=cic_cache
|
||||
USER=grassroots
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=63432
|
||||
ENGINE=postgresql
|
||||
DRIVER=psycopg2
|
||||
DEBUG=1
|
||||
@@ -1,3 +0,0 @@
|
||||
[eth]
|
||||
provider = ws://localhost:63546
|
||||
chain_id = 8996
|
||||
@@ -1,7 +0,0 @@
|
||||
[eth]
|
||||
provider = ws://localhost:8545
|
||||
#ttp_provider = http://localhost:8545
|
||||
#provider = http://localhost:8545
|
||||
gas_provider_address =
|
||||
#chain_id =
|
||||
abi_dir = /usr/local/share/cic/solidity/abi
|
||||
@@ -1,2 +0,0 @@
|
||||
[bancor]
|
||||
dir =
|
||||
@@ -1,2 +0,0 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
@@ -1,9 +0,0 @@
|
||||
[database]
|
||||
NAME=cic-cache-test
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=sqlite
|
||||
DRIVER=pysqlite
|
||||
DEBUG=
|
||||
@@ -1,5 +0,0 @@
|
||||
[eth]
|
||||
#ws_provider = ws://localhost:8546
|
||||
#ttp_provider = http://localhost:8545
|
||||
provider = http://localhost:8545
|
||||
#chain_id =
|
||||
@@ -1,5 +0,0 @@
|
||||
CREATE DATABASE "cic-cache";
|
||||
CREATE DATABASE "cic-eth";
|
||||
CREATE DATABASE "cic-notify";
|
||||
CREATE DATABASE "cic-meta";
|
||||
CREATE DATABASE "cic-signer";
|
||||
@@ -1,22 +0,0 @@
|
||||
CREATE TABLE tx (
|
||||
id SERIAL PRIMARY KEY,
|
||||
date_registered TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
|
||||
block_number INTEGER NOT NULL,
|
||||
tx_index INTEGER NOT NULL,
|
||||
tx_hash VARCHAR(66) NOT NULL,
|
||||
sender VARCHAR(42) NOT NULL,
|
||||
recipient VARCHAR(42) NOT NULL,
|
||||
source_token VARCHAR(42) NOT NULL,
|
||||
destination_token VARCHAR(42) NOT NULL,
|
||||
from_value BIGINT NOT NULL,
|
||||
to_value BIGINT NOT NULL,
|
||||
success BOOLEAN NOT NULL,
|
||||
date_block TIMESTAMP NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE tx_sync (
|
||||
id SERIAL PRIMARY KEY,
|
||||
tx VARCHAR(66) NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');
|
||||
@@ -1,23 +0,0 @@
|
||||
CREATE TABLE tx (
|
||||
id SERIAL PRIMARY KEY,
|
||||
date_registered DATETIME NOT NULL default CURRENT_DATE,
|
||||
block_number INTEGER NOT NULL,
|
||||
tx_index INTEGER NOT NULL,
|
||||
tx_hash VARCHAR(66) NOT NULL,
|
||||
sender VARCHAR(42) NOT NULL,
|
||||
recipient VARCHAR(42) NOT NULL,
|
||||
source_token VARCHAR(42) NOT NULL,
|
||||
destination_token VARCHAR(42) NOT NULL,
|
||||
from_value INTEGER NOT NULL,
|
||||
to_value INTEGER NOT NULL,
|
||||
success BOOLEAN NOT NULL,
|
||||
date_block DATETIME NOT NULL,
|
||||
CHECK (success IN (0, 1))
|
||||
);
|
||||
|
||||
CREATE TABLE tx_sync (
|
||||
id SERIAL PRIMARY_KEY,
|
||||
tx VARCHAR(66) NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');
|
||||
@@ -1,102 +0,0 @@
|
||||
openapi: "3.0.3"
|
||||
info:
|
||||
title: Grassroots Economics CIC Cache
|
||||
description: Cache of processed transaction data from Ethereum blockchain and worker queues
|
||||
termsOfService: bzz://grassrootseconomics.eth/terms
|
||||
contact:
|
||||
name: Grassroots Economics
|
||||
url: https://www.grassrootseconomics.org
|
||||
email: will@grassecon.org
|
||||
license:
|
||||
name: GPLv3
|
||||
version: 0.1.0
|
||||
|
||||
paths:
|
||||
/tx/{offset}/{limit}:
|
||||
description: Bloom filter for batch of latest transactions
|
||||
get:
|
||||
tags:
|
||||
- transactions
|
||||
description:
|
||||
Retrieve transactions
|
||||
operationId: tx.get
|
||||
responses:
|
||||
200:
|
||||
description: Transaction query successful.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/BlocksBloom"
|
||||
|
||||
|
||||
parameters:
|
||||
- name: offset
|
||||
in: path
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
- name: limit
|
||||
in: path
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
|
||||
|
||||
/tx/{address}/{offset}/{limit}:
|
||||
description: Bloom filter for batch of latest transactions by account
|
||||
get:
|
||||
tags:
|
||||
- transactions
|
||||
description:
|
||||
Retrieve transactions
|
||||
operationId: tx.get
|
||||
responses:
|
||||
200:
|
||||
description: Transaction query successful.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/BlocksBloom"
|
||||
|
||||
|
||||
parameters:
|
||||
- name: address
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: offset
|
||||
in: path
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
- name: limit
|
||||
in: path
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
|
||||
components:
|
||||
schemas:
|
||||
BlocksBloom:
|
||||
type: object
|
||||
properties:
|
||||
low:
|
||||
type: int
|
||||
format: int32
|
||||
description: The lowest block number included in the filter
|
||||
block_filter:
|
||||
type: string
|
||||
format: byte
|
||||
description: Block number filter
|
||||
blocktx_filter:
|
||||
type: string
|
||||
format: byte
|
||||
description: Block and tx index filter
|
||||
alg:
|
||||
type: string
|
||||
description: Hashing algorithm (currently only using sha256)
|
||||
filter_rounds:
|
||||
type: int
|
||||
format: int32
|
||||
description: Number of hash rounds used to create the filter
|
||||
@@ -1,54 +0,0 @@
|
||||
FROM python:3.8.6-slim-buster
|
||||
|
||||
#COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
|
||||
|
||||
WORKDIR /usr/src/cic-cache
|
||||
|
||||
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
|
||||
ARG root_requirement_file='requirements.txt'
|
||||
|
||||
#RUN apk update && \
|
||||
# apk add gcc musl-dev gnupg libpq
|
||||
#RUN apk add postgresql-dev
|
||||
#RUN apk add linux-headers
|
||||
#RUN apk add libffi-dev
|
||||
RUN apt-get update && \
|
||||
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
|
||||
|
||||
# Copy shared requirements from top of mono-repo
|
||||
RUN echo "copying root req file ${root_requirement_file}"
|
||||
COPY $root_requirement_file .
|
||||
RUN pip install -r $root_requirement_file $pip_extra_index_url_flag
|
||||
|
||||
COPY cic-cache/requirements.txt ./
|
||||
COPY cic-cache/setup.cfg \
|
||||
cic-cache/setup.py \
|
||||
./
|
||||
COPY cic-cache/cic_cache/ ./cic_cache/
|
||||
COPY cic-cache/scripts/ ./scripts/
|
||||
COPY cic-cache/test_requirements.txt ./
|
||||
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
|
||||
RUN pip install $pip_extra_index_url_flag .
|
||||
RUN pip install .[server]
|
||||
|
||||
COPY cic-cache/tests/ ./tests/
|
||||
#COPY db/ cic-cache/db
|
||||
#RUN apk add postgresql-client
|
||||
|
||||
# ini files in config directory defines the configurable parameters for the application
|
||||
# they can all be overridden by environment variables
|
||||
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
|
||||
COPY cic-cache/config/ /usr/local/etc/cic-cache/
|
||||
|
||||
# for db migrations
|
||||
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
|
||||
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
|
||||
|
||||
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
|
||||
mkdir -p /usr/local/share/cic/solidity && \
|
||||
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
|
||||
|
||||
# Tracker
|
||||
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
|
||||
# Server
|
||||
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
|
||||
3616
apps/cic-cache/examples/bloom_client/package-lock.json
generated
3616
apps/cic-cache/examples/bloom_client/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -1,40 +0,0 @@
|
||||
let xmlhttprequest = require('xhr2');
|
||||
let moolb = require('moolb');
|
||||
|
||||
let xhr = new xmlhttprequest();
|
||||
xhr.responseType = 'json';
|
||||
xhr.open('GET', 'http://localhost:5555/tx/0/100');
|
||||
xhr.addEventListener('load', (e) => {
|
||||
|
||||
d = xhr.response;
|
||||
|
||||
b_one = Buffer.from(d.block_filter, 'base64');
|
||||
b_two = Buffer.from(d.blocktx_filter, 'base64');
|
||||
|
||||
for (let i = 0; i < 8192; i++) {
|
||||
if (b_two[i] > 0) {
|
||||
console.debug('value on', i, b_two[i]);
|
||||
}
|
||||
}
|
||||
console.log(b_one, b_two);
|
||||
|
||||
let f_block = moolb.fromBytes(b_one, d.filter_rounds);
|
||||
let f_blocktx = moolb.fromBytes(b_two, d.filter_rounds);
|
||||
let a = new ArrayBuffer(8);
|
||||
let w = new DataView(a);
|
||||
for (let i = 410000; i < 430000; i++) {
|
||||
w.setInt32(0, i);
|
||||
let r = new Uint8Array(a.slice(0, 4));
|
||||
if (f_block.check(r)) {
|
||||
for (let j = 0; j < 200; j++) {
|
||||
w = new DataView(a);
|
||||
w.setInt32(4, j);
|
||||
r = new Uint8Array(a);
|
||||
if (f_blocktx.check(r)) {
|
||||
console.log('true', i, j);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let r = xhr.send();
|
||||
@@ -1,10 +0,0 @@
|
||||
alembic==1.4.2
|
||||
confini~=0.3.6b2
|
||||
uwsgi==2.0.19.1
|
||||
moolb~=0.1.0
|
||||
cic-registry~=0.5.3a4
|
||||
SQLAlchemy==1.3.20
|
||||
semver==2.13.0
|
||||
psycopg2==2.8.6
|
||||
celery==4.4.7
|
||||
redis==3.5.3
|
||||
@@ -1,56 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
import os
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
import alembic
|
||||
from alembic.config import Config as AlembicConfig
|
||||
import confini
|
||||
|
||||
from cic_cache.db import dsn_from_config
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
# BUG: the dbdir doesn't work after script install
|
||||
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
|
||||
migrationsdir = os.path.join(dbdir, 'migrations')
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-cache')
|
||||
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory')
|
||||
argparser.add_argument('-v', action='store_true', help='be verbose')
|
||||
argparser.add_argument('-vv', action='store_true', help='be more verbose')
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.vv:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
elif args.v:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
config = confini.Config(args.c, args.env_prefix)
|
||||
config.process()
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config:\n{}'.format(config))
|
||||
|
||||
migrations_dir = os.path.join(args.migrations_dir, config.get('DATABASE_ENGINE'))
|
||||
if not os.path.isdir(migrations_dir):
|
||||
logg.debug('migrations dir for engine {} not found, reverting to default'.format(config.get('DATABASE_ENGINE')))
|
||||
migrations_dir = os.path.join(args.migrations_dir, 'default')
|
||||
|
||||
# connect to database
|
||||
dsn = dsn_from_config(config)
|
||||
|
||||
|
||||
logg.info('using migrations dir {}'.format(migrations_dir))
|
||||
logg.info('using db {}'.format(dsn))
|
||||
ac = AlembicConfig(os.path.join(migrations_dir, 'alembic.ini'))
|
||||
ac.set_main_option('sqlalchemy.url', dsn)
|
||||
ac.set_main_option('script_location', migrations_dir)
|
||||
|
||||
alembic.command.upgrade(ac, 'head')
|
||||
@@ -1,60 +0,0 @@
|
||||
from setuptools import setup
|
||||
|
||||
import configparser
|
||||
import os
|
||||
import time
|
||||
|
||||
from cic_cache.version import (
|
||||
version_object,
|
||||
version_string
|
||||
)
|
||||
|
||||
class PleaseCommitFirstError(Exception):
|
||||
pass
|
||||
|
||||
def git_hash():
|
||||
import subprocess
|
||||
git_diff = subprocess.run(['git', 'diff'], capture_output=True)
|
||||
if len(git_diff.stdout) > 0:
|
||||
raise PleaseCommitFirstError()
|
||||
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
|
||||
git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
|
||||
return git_hash_brief
|
||||
|
||||
version_string = str(version_object)
|
||||
|
||||
try:
|
||||
version_git = git_hash()
|
||||
version_string += '+build.{}'.format(version_git)
|
||||
except FileNotFoundError:
|
||||
time_string_pair = str(time.time()).split('.')
|
||||
version_string += '+build.{}{:<09d}'.format(
|
||||
time_string_pair[0],
|
||||
int(time_string_pair[1]),
|
||||
)
|
||||
print('final version string will be {}'.format(version_string))
|
||||
|
||||
requirements = []
|
||||
f = open('requirements.txt', 'r')
|
||||
while True:
|
||||
l = f.readline()
|
||||
if l == '':
|
||||
break
|
||||
requirements.append(l.rstrip())
|
||||
f.close()
|
||||
|
||||
test_requirements = []
|
||||
f = open('test_requirements.txt', 'r')
|
||||
while True:
|
||||
l = f.readline()
|
||||
if l == '':
|
||||
break
|
||||
test_requirements.append(l.rstrip())
|
||||
f.close()
|
||||
|
||||
|
||||
setup(
|
||||
version=version_string,
|
||||
install_requires=requirements,
|
||||
tests_require=test_requirements,
|
||||
)
|
||||
@@ -1,6 +0,0 @@
|
||||
pytest==6.0.1
|
||||
pytest-cov==2.10.1
|
||||
pytest-mock==3.3.1
|
||||
pysqlite3==0.4.3
|
||||
sqlparse==0.4.1
|
||||
pytest-celery==0.0.0a1
|
||||
@@ -1,86 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import sys
|
||||
import datetime
|
||||
|
||||
# third-party imports
|
||||
import pytest
|
||||
|
||||
# local imports
|
||||
from cic_cache import db
|
||||
|
||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
root_dir = os.path.dirname(script_dir)
|
||||
sys.path.insert(0, root_dir)
|
||||
|
||||
# fixtures
|
||||
from tests.fixtures_config import *
|
||||
from tests.fixtures_database import *
|
||||
from tests.fixtures_celery import *
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def balances_dict_fields():
|
||||
return {
|
||||
'out_pending': 0,
|
||||
'out_synced': 1,
|
||||
'out_confirmed': 2,
|
||||
'in_pending': 3,
|
||||
'in_synced': 4,
|
||||
'in_confirmed': 5,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def txs(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
list_tokens,
|
||||
):
|
||||
|
||||
session = init_database
|
||||
|
||||
tx_number = 13
|
||||
tx_hash_first = '0x' + os.urandom(32).hex()
|
||||
val = 15000
|
||||
nonce = 1
|
||||
dt = datetime.datetime.utcnow()
|
||||
db.add_transaction(
|
||||
session,
|
||||
tx_hash_first,
|
||||
list_defaults['block'],
|
||||
tx_number,
|
||||
list_actors['alice'],
|
||||
list_actors['bob'],
|
||||
list_tokens['foo'],
|
||||
list_tokens['foo'],
|
||||
1024,
|
||||
2048,
|
||||
True,
|
||||
dt.timestamp(),
|
||||
)
|
||||
|
||||
|
||||
tx_number = 42
|
||||
tx_hash_second = '0x' + os.urandom(32).hex()
|
||||
tx_signed_second = '0x' + os.urandom(128).hex()
|
||||
nonce = 1
|
||||
dt -= datetime.timedelta(hours=1)
|
||||
db.add_transaction(
|
||||
session,
|
||||
tx_hash_second,
|
||||
list_defaults['block']-1,
|
||||
tx_number,
|
||||
list_actors['diane'],
|
||||
list_actors['alice'],
|
||||
list_tokens['foo'],
|
||||
list_tokens['foo'],
|
||||
1024,
|
||||
2048,
|
||||
False,
|
||||
dt.timestamp(),
|
||||
)
|
||||
|
||||
session.commit()
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
# third-party imports
|
||||
import pytest
|
||||
import tempfile
|
||||
import logging
|
||||
import shutil
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# celery fixtures
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_includes():
|
||||
return [
|
||||
'cic_cache.tasks.tx',
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_config():
|
||||
bq = tempfile.mkdtemp()
|
||||
bp = tempfile.mkdtemp()
|
||||
rq = tempfile.mkdtemp()
|
||||
logg.debug('celery broker queue {} processed {}'.format(bq, bp))
|
||||
logg.debug('celery backend store {}'.format(rq))
|
||||
yield {
|
||||
'broker_url': 'filesystem://',
|
||||
'broker_transport_options': {
|
||||
'data_folder_in': bq,
|
||||
'data_folder_out': bq,
|
||||
'data_folder_processed': bp,
|
||||
},
|
||||
'result_backend': 'file://{}'.format(rq),
|
||||
}
|
||||
logg.debug('cleaning up celery filesystem backend files {} {} {}'.format(bq, bp, rq))
|
||||
shutil.rmtree(bq)
|
||||
shutil.rmtree(bp)
|
||||
shutil.rmtree(rq)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_worker_parameters():
|
||||
return {
|
||||
# 'queues': ('cic-cache'),
|
||||
}
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_enable_logging():
|
||||
return True
|
||||
@@ -1,20 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import pytest
|
||||
import confini
|
||||
|
||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
root_dir = os.path.dirname(script_dir)
|
||||
logg = logging.getLogger(__file__)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def load_config():
|
||||
config_dir = os.path.join(root_dir, '.config/test')
|
||||
conf = confini.Config(config_dir, 'CICTEST')
|
||||
conf.process()
|
||||
logg.debug('config {}'.format(conf))
|
||||
return conf
|
||||
@@ -1,118 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
import re
|
||||
|
||||
# third-party imports
|
||||
import pytest
|
||||
import sqlparse
|
||||
|
||||
# local imports
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
from cic_cache.db import dsn_from_config
|
||||
|
||||
logg = logging.getLogger(__file__)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def database_engine(
|
||||
load_config,
|
||||
):
|
||||
if load_config.get('DATABASE_ENGINE') == 'sqlite':
|
||||
SessionBase.transactional = False
|
||||
SessionBase.poolable = False
|
||||
try:
|
||||
os.unlink(load_config.get('DATABASE_NAME'))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
dsn = dsn_from_config(load_config)
|
||||
SessionBase.connect(dsn)
|
||||
return dsn
|
||||
|
||||
|
||||
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
|
||||
@pytest.fixture(scope='function')
|
||||
def init_database(
|
||||
load_config,
|
||||
database_engine,
|
||||
):
|
||||
|
||||
rootdir = os.path.dirname(os.path.dirname(__file__))
|
||||
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
|
||||
|
||||
if load_config.get('DATABASE_ENGINE') == 'sqlite':
|
||||
rconn = SessionBase.engine.raw_connection()
|
||||
f = open(os.path.join(schemadir, 'db.sql'))
|
||||
s = f.read()
|
||||
f.close()
|
||||
rconn.executescript(s)
|
||||
|
||||
else:
|
||||
rconn = SessionBase.engine.raw_connection()
|
||||
rcursor = rconn.cursor()
|
||||
|
||||
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
|
||||
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
|
||||
|
||||
f = open(os.path.join(schemadir, 'db.sql'))
|
||||
s = f.read()
|
||||
f.close()
|
||||
r = re.compile(r'^[A-Z]', re.MULTILINE)
|
||||
for l in sqlparse.parse(s):
|
||||
strl = str(l)
|
||||
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
|
||||
if not re.search(r, strl):
|
||||
logg.warning('skipping parsed query line {}'.format(strl))
|
||||
continue
|
||||
rcursor.execute(strl)
|
||||
rconn.commit()
|
||||
|
||||
rcursor.execute('SET search_path TO public')
|
||||
|
||||
# this doesn't work when run separately, no idea why
|
||||
# functions have been manually added to original schema from cic-eth
|
||||
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
|
||||
# s = f.read()
|
||||
# f.close()
|
||||
# rcursor.execute(s)
|
||||
#
|
||||
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
|
||||
# s = f.read()
|
||||
# f.close()
|
||||
# rcursor.execute(s)
|
||||
|
||||
rcursor.close()
|
||||
|
||||
session = SessionBase.create_session()
|
||||
yield session
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def list_tokens(
|
||||
):
|
||||
return {
|
||||
'foo': '0x' + os.urandom(20).hex(),
|
||||
'bar': '0x' + os.urandom(20).hex(),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def list_actors(
|
||||
):
|
||||
return {
|
||||
'alice': '0x' + os.urandom(20).hex(),
|
||||
'bob': '0x' + os.urandom(20).hex(),
|
||||
'charlie': '0x' + os.urandom(20).hex(),
|
||||
'diane': '0x' + os.urandom(20).hex(),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def list_defaults(
|
||||
):
|
||||
|
||||
return {
|
||||
'block': 420000,
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import datetime
|
||||
import logging
|
||||
import json
|
||||
|
||||
# third-party imports
|
||||
import pytest
|
||||
|
||||
# local imports
|
||||
from cic_cache import BloomCache
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def test_cache(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
list_tokens,
|
||||
txs,
|
||||
):
|
||||
|
||||
session = init_database
|
||||
|
||||
c = BloomCache(session)
|
||||
b = c.load_transactions(0, 100)
|
||||
|
||||
assert b[0] == list_defaults['block'] - 1
|
||||
|
||||
c = BloomCache(session)
|
||||
c.load_transactions_account(list_actors['alice'],0, 100)
|
||||
|
||||
assert b[0] == list_defaults['block'] - 1
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_cache.api import Api
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def test_task(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
list_tokens,
|
||||
txs,
|
||||
celery_session_worker,
|
||||
):
|
||||
|
||||
api = Api(queue=None)
|
||||
t = api.list(0, 100)
|
||||
r = t.get()
|
||||
logg.debug('r {}'.format(r))
|
||||
|
||||
assert r['low'] == list_defaults['block'] - 1
|
||||
@@ -8,7 +8,6 @@ from cic_registry import zero_address
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.enum import LockEnum
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.db.models.lock import Lock
|
||||
from cic_eth.error import LockedError
|
||||
|
||||
@@ -117,10 +116,9 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
|
||||
|
||||
@celery_app.task()
|
||||
def check_lock(chained_input, chain_str, lock_flags, address=None):
|
||||
session = SessionBase.create_session()
|
||||
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)
|
||||
r = Lock.check(chain_str, lock_flags, address=zero_address)
|
||||
if address != None:
|
||||
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
|
||||
r |= Lock.check(chain_str, lock_flags, address=address)
|
||||
if r > 0:
|
||||
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
|
||||
raise LockedError(r)
|
||||
|
||||
@@ -457,7 +457,6 @@ class AdminApi:
|
||||
tx_unpacked = unpack_signed_raw_tx(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
|
||||
tx['gas_price'] = tx_unpacked['gasPrice']
|
||||
tx['gas_limit'] = tx_unpacked['gas']
|
||||
tx['data'] = tx_unpacked['data']
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.queue.tx.get_state_log',
|
||||
|
||||
@@ -6,13 +6,10 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
# third-party imports
|
||||
import celery
|
||||
#from cic_registry.chain import ChainSpec
|
||||
from cic_registry.chain import ChainSpec
|
||||
from cic_registry import CICRegistry
|
||||
from chainlib.chain import ChainSpec
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth.factory import TxFactory
|
||||
from cic_eth.db.enum import LockEnum
|
||||
|
||||
|
||||
@@ -18,11 +18,7 @@ logg = celery_app.log.get_default_logger()
|
||||
def redis(self, result, destination, status_code):
|
||||
(host, port, db, channel) = destination.split(':')
|
||||
r = redis_interface.Redis(host=host, port=port, db=db)
|
||||
data = {
|
||||
'root_id': self.request.root_id,
|
||||
'status': status_code,
|
||||
'result': result,
|
||||
}
|
||||
s = json.dumps(result)
|
||||
logg.debug('redis callback on host {} port {} db {} channel {}'.format(host, port, db, channel))
|
||||
r.publish(channel, json.dumps(data))
|
||||
r.publish(channel, s)
|
||||
r.close()
|
||||
|
||||
@@ -20,10 +20,5 @@ def tcp(self, result, destination, status_code):
|
||||
(host, port) = destination.split(':')
|
||||
logg.debug('tcp callback to {} {}'.format(host, port))
|
||||
s.connect((host, int(port)))
|
||||
data = {
|
||||
'root_id': self.request.root_id,
|
||||
'status': status_code,
|
||||
'result': result,
|
||||
}
|
||||
s.send(json.dumps(data).encode('utf-8'))
|
||||
s.send(json.dumps(result).encode('utf-8'))
|
||||
s.close()
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
"""Add chain syncer
|
||||
|
||||
Revision ID: ec40ac0974c1
|
||||
Revises: 6ac7a1dadc46
|
||||
Create Date: 2021-02-23 06:10:19.246304
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from chainsyncer.db.migrations.sqlalchemy import (
|
||||
chainsyncer_upgrade,
|
||||
chainsyncer_downgrade,
|
||||
)
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'ec40ac0974c1'
|
||||
down_revision = '6ac7a1dadc46'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
chainsyncer_upgrade(0, 0, 1)
|
||||
|
||||
|
||||
def downgrade():
|
||||
chainsyncer_downgrade(0, 0, 1)
|
||||
@@ -1,28 +0,0 @@
|
||||
"""Add chain syncer
|
||||
|
||||
Revision ID: ec40ac0974c1
|
||||
Revises: 6ac7a1dadc46
|
||||
Create Date: 2021-02-23 06:10:19.246304
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from chainsyncer.db.migrations.sqlalchemy import (
|
||||
chainsyncer_upgrade,
|
||||
chainsyncer_downgrade,
|
||||
)
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'ec40ac0974c1'
|
||||
down_revision = '6ac7a1dadc46'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
chainsyncer_upgrade(0, 0, 1)
|
||||
|
||||
|
||||
def downgrade():
|
||||
chainsyncer_downgrade(0, 0, 1)
|
||||
@@ -55,9 +55,11 @@ class Lock(SessionBase):
|
||||
:returns: New flag state of entry
|
||||
:rtype: number
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
|
||||
q = session.query(Lock)
|
||||
q = localsession.query(Lock)
|
||||
#q = q.join(TxCache, isouter=True)
|
||||
q = q.filter(Lock.address==address)
|
||||
q = q.filter(Lock.blockchain==chain_str)
|
||||
@@ -69,8 +71,7 @@ class Lock(SessionBase):
|
||||
lock.address = address
|
||||
lock.blockchain = chain_str
|
||||
if tx_hash != None:
|
||||
session.flush()
|
||||
q = session.query(Otx)
|
||||
q = localsession.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
otx = q.first()
|
||||
if otx != None:
|
||||
@@ -79,11 +80,12 @@ class Lock(SessionBase):
|
||||
lock.flags |= flags
|
||||
r = lock.flags
|
||||
|
||||
session.add(lock)
|
||||
session.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
localsession.add(lock)
|
||||
localsession.commit()
|
||||
|
||||
if session == None:
|
||||
localsession.close()
|
||||
|
||||
return r
|
||||
|
||||
|
||||
@@ -108,9 +110,11 @@ class Lock(SessionBase):
|
||||
:returns: New flag state of entry
|
||||
:rtype: number
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
|
||||
q = session.query(Lock)
|
||||
q = localsession.query(Lock)
|
||||
#q = q.join(TxCache, isouter=True)
|
||||
q = q.filter(Lock.address==address)
|
||||
q = q.filter(Lock.blockchain==chain_str)
|
||||
@@ -120,13 +124,14 @@ class Lock(SessionBase):
|
||||
if lock != None:
|
||||
lock.flags &= ~flags
|
||||
if lock.flags == 0:
|
||||
session.delete(lock)
|
||||
localsession.delete(lock)
|
||||
else:
|
||||
session.add(lock)
|
||||
localsession.add(lock)
|
||||
r = lock.flags
|
||||
session.commit()
|
||||
localsession.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
if session == None:
|
||||
localsession.close()
|
||||
|
||||
return r
|
||||
|
||||
@@ -151,20 +156,22 @@ class Lock(SessionBase):
|
||||
:rtype: number
|
||||
"""
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
|
||||
q = session.query(Lock)
|
||||
q = localsession.query(Lock)
|
||||
#q = q.join(TxCache, isouter=True)
|
||||
q = q.filter(Lock.address==address)
|
||||
q = q.filter(Lock.blockchain==chain_str)
|
||||
q = q.filter(Lock.flags.op('&')(flags)==flags)
|
||||
lock = q.first()
|
||||
if session == None:
|
||||
localsession.close()
|
||||
|
||||
r = 0
|
||||
if lock != None:
|
||||
r = lock.flags & flags
|
||||
|
||||
SessionBase.release_session(session)
|
||||
return r
|
||||
|
||||
|
||||
|
||||
@@ -21,9 +21,12 @@ class Nonce(SessionBase):
|
||||
|
||||
@staticmethod
|
||||
def get(address, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
|
||||
q = session.query(Nonce)
|
||||
|
||||
q = localsession.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==address)
|
||||
nonce = q.first()
|
||||
|
||||
@@ -31,29 +34,28 @@ class Nonce(SessionBase):
|
||||
if nonce != None:
|
||||
nonce_value = nonce.nonce;
|
||||
|
||||
SessionBase.release_session(session)
|
||||
if session == None:
|
||||
localsession.close()
|
||||
|
||||
return nonce_value
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get(session, address):
|
||||
r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
|
||||
def __get(conn, address):
|
||||
r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
|
||||
nonce = r.fetchone()
|
||||
session.flush()
|
||||
if nonce == None:
|
||||
return None
|
||||
return nonce[0]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __set(session, address, nonce):
|
||||
session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
|
||||
session.flush()
|
||||
def __set(conn, address, nonce):
|
||||
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
|
||||
|
||||
|
||||
@staticmethod
|
||||
def next(address, initial_if_not_exists=0, session=None):
|
||||
def next(address, initial_if_not_exists=0):
|
||||
"""Generate next nonce for the given address.
|
||||
|
||||
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
|
||||
@@ -65,32 +67,20 @@ class Nonce(SessionBase):
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
session.begin_nested()
|
||||
#conn = Nonce.engine.connect()
|
||||
conn = Nonce.engine.connect()
|
||||
if Nonce.transactional:
|
||||
#session.execute('BEGIN')
|
||||
session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
|
||||
session.flush()
|
||||
nonce = Nonce.__get(session, address)
|
||||
conn.execute('BEGIN')
|
||||
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
|
||||
nonce = Nonce.__get(conn, address)
|
||||
logg.debug('get nonce {} for address {}'.format(nonce, address))
|
||||
if nonce == None:
|
||||
nonce = initial_if_not_exists
|
||||
session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
|
||||
session.flush()
|
||||
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
|
||||
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
|
||||
Nonce.__set(session, address, nonce+1)
|
||||
#if Nonce.transactional:
|
||||
#session.execute('COMMIT')
|
||||
#session.execute('UNLOCK TABLE nonce')
|
||||
#conn.close()
|
||||
session.commit()
|
||||
session.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
Nonce.__set(conn, address, nonce+1)
|
||||
if Nonce.transactional:
|
||||
conn.execute('COMMIT')
|
||||
conn.close()
|
||||
return nonce
|
||||
|
||||
|
||||
|
||||
@@ -79,13 +79,6 @@ class Otx(SessionBase):
|
||||
return r
|
||||
|
||||
|
||||
def __status_not_set(self, status):
|
||||
r = not(self.status & status)
|
||||
if r:
|
||||
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
|
||||
return r
|
||||
|
||||
|
||||
def set_block(self, block, session=None):
|
||||
"""Set block number transaction was mined in.
|
||||
|
||||
@@ -327,32 +320,6 @@ class Otx(SessionBase):
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def dequeue(self, session=None):
|
||||
"""Marks that a process to execute send attempt is underway
|
||||
|
||||
Only manipulates object, does not transaction or commit to backend.
|
||||
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
if self.__status_not_set(StatusBits.QUEUED):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__reset_status(StatusBits.QUEUED, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
|
||||
def minefail(self, block, session=None):
|
||||
"""Marks that transaction was mined but code execution did not succeed.
|
||||
|
||||
@@ -406,6 +373,18 @@ class Otx(SessionBase):
|
||||
else:
|
||||
self.__set_status(StatusEnum.OBSOLETED, session)
|
||||
|
||||
|
||||
# if confirmed:
|
||||
# if self.status != StatusEnum.OBSOLETED:
|
||||
# logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
|
||||
# #raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
|
||||
# self.__set_status(StatusEnum.CANCELLED, session)
|
||||
# elif self.status != StatusEnum.OBSOLETED:
|
||||
# if self.status > StatusEnum.SENT:
|
||||
# logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
|
||||
# #raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
|
||||
# self.__set_status(StatusEnum.OBSOLETED, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
|
||||
@@ -24,10 +24,9 @@ class AccountRole(SessionBase):
|
||||
tag = Column(Text)
|
||||
address_hex = Column(String(42))
|
||||
|
||||
|
||||
# TODO:
|
||||
|
||||
@staticmethod
|
||||
def get_address(tag, session):
|
||||
def get_address(tag):
|
||||
"""Get Ethereum address matching the given tag
|
||||
|
||||
:param tag: Tag
|
||||
@@ -35,26 +34,14 @@ class AccountRole(SessionBase):
|
||||
:returns: Ethereum address, or zero-address if tag does not exist
|
||||
:rtype: str, 0x-hex
|
||||
"""
|
||||
if session == None:
|
||||
raise ValueError('nested bind session calls will not succeed as the first call to release_session in the stack will leave the db object detached further down the stack. We will need additional reference count.')
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
role = AccountRole.__get_role(tag, session)
|
||||
|
||||
r = zero_address
|
||||
if role != None:
|
||||
r = role.address_hex
|
||||
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return r
|
||||
role = AccountRole.get_role(tag)
|
||||
if role == None:
|
||||
return zero_address
|
||||
return role.address_hex
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_role(tag, session=None):
|
||||
def get_role(tag):
|
||||
"""Get AccountRole model object matching the given tag
|
||||
|
||||
:param tag: Tag
|
||||
@@ -62,27 +49,20 @@ class AccountRole(SessionBase):
|
||||
:returns: Role object, if found
|
||||
:rtype: cic_eth.db.models.role.AccountRole
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
role = AccountRole.__get_role(tag, session)
|
||||
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
session = AccountRole.create_session()
|
||||
role = AccountRole.__get_role(session, tag)
|
||||
session.close()
|
||||
#return role.address_hex
|
||||
return role
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get_role(tag, session):
|
||||
q = session.query(AccountRole)
|
||||
q = q.filter(AccountRole.tag==tag)
|
||||
r = q.first()
|
||||
return r
|
||||
def __get_role(session, tag):
|
||||
return session.query(AccountRole).filter(AccountRole.tag==tag).first()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def set(tag, address_hex, session=None):
|
||||
def set(tag, address_hex):
|
||||
"""Persist a tag to Ethereum address association.
|
||||
|
||||
This will silently overwrite the existing value.
|
||||
@@ -94,18 +74,16 @@ class AccountRole(SessionBase):
|
||||
:returns: Role object
|
||||
:rtype: cic_eth.db.models.role.AccountRole
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
role = AccountRole.__get_role(tag, session)
|
||||
#session = AccountRole.create_session()
|
||||
#role = AccountRole.__get(session, tag)
|
||||
role = AccountRole.get_role(tag) #session, tag)
|
||||
if role == None:
|
||||
role = AccountRole(tag)
|
||||
role.address_hex = address_hex
|
||||
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return role
|
||||
#session.add(role)
|
||||
#session.commit()
|
||||
#session.close()
|
||||
return role #address_hex
|
||||
|
||||
|
||||
@staticmethod
|
||||
@@ -117,17 +95,20 @@ class AccountRole(SessionBase):
|
||||
:returns: Role tag, or None if no match
|
||||
:rtype: str or None
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
|
||||
q = session.query(AccountRole)
|
||||
q = localsession.query(AccountRole)
|
||||
q = q.filter(AccountRole.address_hex==address)
|
||||
role = q.first()
|
||||
tag = None
|
||||
if role != None:
|
||||
tag = role.tag
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
if session == None:
|
||||
localsession.close()
|
||||
|
||||
return tag
|
||||
|
||||
|
||||
|
||||
@@ -85,27 +85,26 @@ class TxCache(SessionBase):
|
||||
:param tx_hash_new: tx hash to associate the copied entry with
|
||||
:type tx_hash_new: str, 0x-hex
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
localsession = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(TxCache)
|
||||
q = localsession.query(TxCache)
|
||||
q = q.join(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash_original)
|
||||
txc = q.first()
|
||||
|
||||
if txc == None:
|
||||
SessionBase.release_session(session)
|
||||
SessionBase.release_session(localsession)
|
||||
raise NotLocalTxError('original {}'.format(tx_hash_original))
|
||||
if txc.block_number != None:
|
||||
SessionBase.release_session(session)
|
||||
SessionBase.release_session(localsession)
|
||||
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
|
||||
|
||||
session.flush()
|
||||
q = session.query(Otx)
|
||||
q = localsession.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash_new)
|
||||
otx = q.first()
|
||||
|
||||
if otx == None:
|
||||
SessionBase.release_session(session)
|
||||
SessionBase.release_session(localsession)
|
||||
raise NotLocalTxError('new {}'.format(tx_hash_new))
|
||||
|
||||
txc_new = TxCache(
|
||||
@@ -116,21 +115,18 @@ class TxCache(SessionBase):
|
||||
txc.destination_token_address,
|
||||
int(txc.from_value),
|
||||
int(txc.to_value),
|
||||
session=session,
|
||||
)
|
||||
session.add(txc_new)
|
||||
session.commit()
|
||||
localsession.add(txc_new)
|
||||
localsession.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
SessionBase.release_session(localsession)
|
||||
|
||||
|
||||
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
tx = q.first()
|
||||
localsession = SessionBase.bind_session(session)
|
||||
tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
if tx == None:
|
||||
SessionBase.release_session(session)
|
||||
SessionBase.release_session(localsession)
|
||||
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
|
||||
self.otx_id = tx.id
|
||||
|
||||
@@ -147,5 +143,5 @@ class TxCache(SessionBase):
|
||||
self.date_updated = self.date_created
|
||||
self.date_checked = self.date_created
|
||||
|
||||
SessionBase.release_session(session)
|
||||
SessionBase.release_session(localsession)
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ from cic_registry import CICRegistry
|
||||
from cic_registry.chain import ChainSpec
|
||||
from erc20_single_shot_faucet import Faucet
|
||||
from cic_registry import zero_address
|
||||
from hexathon import strip_0x
|
||||
|
||||
# local import
|
||||
from cic_eth.eth import RpcClient
|
||||
@@ -22,7 +21,6 @@ from cic_eth.db.models.role import AccountRole
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.error import RoleMissingError
|
||||
from cic_eth.task import CriticalSQLAlchemyTask
|
||||
|
||||
#logg = logging.getLogger(__name__)
|
||||
logg = logging.getLogger()
|
||||
@@ -36,7 +34,6 @@ class AccountTxFactory(TxFactory):
|
||||
self,
|
||||
address,
|
||||
chain_spec,
|
||||
session=None,
|
||||
):
|
||||
"""Register an Ethereum account address with the on-chain account registry
|
||||
|
||||
@@ -59,7 +56,7 @@ class AccountTxFactory(TxFactory):
|
||||
'gas': gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'nonce': self.next_nonce(),
|
||||
'value': 0,
|
||||
})
|
||||
return tx_add
|
||||
@@ -69,7 +66,6 @@ class AccountTxFactory(TxFactory):
|
||||
self,
|
||||
address,
|
||||
chain_spec,
|
||||
session=None,
|
||||
):
|
||||
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
|
||||
|
||||
@@ -90,7 +86,7 @@ class AccountTxFactory(TxFactory):
|
||||
'gas': gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'nonce': self.next_nonce(),
|
||||
'value': 0,
|
||||
})
|
||||
return tx_add
|
||||
@@ -105,12 +101,11 @@ def unpack_register(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
f = data[2:10]
|
||||
if f != '0a3b0a4f':
|
||||
raise ValueError('Invalid account index register data ({})'.format(f))
|
||||
|
||||
d = data[8:]
|
||||
d = data[10:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
}
|
||||
@@ -125,19 +120,17 @@ def unpack_gift(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
f = data[2:10]
|
||||
if f != '63e4bff4':
|
||||
raise ValueError('Invalid gift data ({})'.format(f))
|
||||
raise ValueError('Invalid account index register data ({})'.format(f))
|
||||
|
||||
d = data[8:]
|
||||
d = data[10:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
}
|
||||
|
||||
|
||||
# TODO: Separate out nonce initialization task
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def create(password, chain_str):
|
||||
"""Creates and stores a new ethereum account in the keystore.
|
||||
|
||||
@@ -156,13 +149,9 @@ def create(password, chain_str):
|
||||
logg.debug('created account {}'.format(a))
|
||||
|
||||
# Initialize nonce provider record for account
|
||||
# TODO: this can safely be set to zero, since we are randomly creating account
|
||||
n = c.w3.eth.getTransactionCount(a, 'pending')
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==a)
|
||||
o = q.first()
|
||||
session.flush()
|
||||
o = session.query(Nonce).filter(Nonce.address_hex==a).first()
|
||||
if o == None:
|
||||
o = Nonce()
|
||||
o.address_hex = a
|
||||
@@ -173,7 +162,7 @@ def create(password, chain_str):
|
||||
return a
|
||||
|
||||
|
||||
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True, throws=(RoleMissingError,))
|
||||
def register(self, account_address, chain_str, writer_address=None):
|
||||
"""Creates a transaction to add the given address to the accounts index.
|
||||
|
||||
@@ -189,24 +178,21 @@ def register(self, account_address, chain_str, writer_address=None):
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
if writer_address == None:
|
||||
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session)
|
||||
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER')
|
||||
|
||||
if writer_address == zero_address:
|
||||
session.close()
|
||||
raise RoleMissingError(account_address)
|
||||
|
||||
|
||||
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
|
||||
c = RpcClient(chain_spec, holder_address=writer_address)
|
||||
txf = AccountTxFactory(writer_address, c)
|
||||
|
||||
tx_add = txf.add(account_address, chain_spec, session=session)
|
||||
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
|
||||
session.close()
|
||||
tx_add = txf.add(account_address, chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data')
|
||||
|
||||
gas_budget = tx_add['gas'] * tx_add['gasPrice']
|
||||
|
||||
@@ -223,7 +209,7 @@ def register(self, account_address, chain_str, writer_address=None):
|
||||
return account_address
|
||||
|
||||
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True)
|
||||
def gift(self, account_address, chain_str):
|
||||
"""Creates a transaction to invoke the faucet contract for the given address.
|
||||
|
||||
@@ -242,14 +228,12 @@ def gift(self, account_address, chain_str):
|
||||
c = RpcClient(chain_spec, holder_address=account_address)
|
||||
txf = AccountTxFactory(account_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_add = txf.gift(account_address, chain_spec, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
|
||||
session.close()
|
||||
tx_add = txf.gift(account_address, chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data')
|
||||
|
||||
gas_budget = tx_add['gas'] * tx_add['gasPrice']
|
||||
|
||||
logg.debug('gift user tx {}'.format(tx_hash_hex))
|
||||
logg.debug('register user tx {}'.format(tx_hash_hex))
|
||||
s = create_check_gas_and_send_task(
|
||||
[tx_signed_raw_hex],
|
||||
chain_str,
|
||||
@@ -340,7 +324,7 @@ def cache_gift_data(
|
||||
return (tx_hash_hex, cache_id)
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def cache_account_data(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
|
||||
@@ -32,10 +32,10 @@ class TxFactory:
|
||||
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
|
||||
|
||||
|
||||
def next_nonce(self, session=None):
|
||||
def next_nonce(self):
|
||||
"""Returns the current cached nonce value, and increments it for next transaction.
|
||||
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
return self.nonce_oracle.next(session=session)
|
||||
return self.nonce_oracle.next()
|
||||
|
||||
@@ -52,10 +52,7 @@ class GasOracle():
|
||||
:returns: Etheerum account address
|
||||
:rtype: str, 0x-hex
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
a = AccountRole.get_address('GAS_GIFTER', session)
|
||||
session.close()
|
||||
return a
|
||||
return AccountRole.get_address('GAS_GIFTER')
|
||||
|
||||
|
||||
def gas_price(self, category='safe'):
|
||||
|
||||
@@ -14,10 +14,10 @@ class NonceOracle():
|
||||
self.default_nonce = default_nonce
|
||||
|
||||
|
||||
def next(self, session=None):
|
||||
def next(self):
|
||||
"""Get next unique nonce.
|
||||
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
return Nonce.next(self.address, self.default_nonce, session=session)
|
||||
return Nonce.next(self.address, self.default_nonce)
|
||||
|
||||
@@ -33,7 +33,7 @@ def sign_tx(tx, chain_str):
|
||||
return (tx_hash_hex, tx_transfer_signed['raw'],)
|
||||
|
||||
|
||||
def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
|
||||
def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
|
||||
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
|
||||
|
||||
:param tx: Standard ethereum transaction data
|
||||
@@ -44,7 +44,6 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
|
||||
:type queue: str
|
||||
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
|
||||
:type cache_task: str
|
||||
:raises: sqlalchemy.exc.DatabaseError
|
||||
:returns: Tuple; Transaction hash, signed raw transaction data
|
||||
:rtype: tuple
|
||||
"""
|
||||
@@ -52,13 +51,25 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
|
||||
|
||||
logg.debug('adding queue tx {}'.format(tx_hash_hex))
|
||||
|
||||
# s = celery.signature(
|
||||
# 'cic_eth.queue.tx.create',
|
||||
# [
|
||||
# tx['nonce'],
|
||||
# tx['from'],
|
||||
# tx_hash_hex,
|
||||
# tx_signed_raw_hex,
|
||||
# chain_str,
|
||||
# ],
|
||||
# queue=queue,
|
||||
# )
|
||||
|
||||
# TODO: consider returning this as a signature that consequtive tasks can be linked to
|
||||
queue_create(
|
||||
tx['nonce'],
|
||||
tx['from'],
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
chain_str,
|
||||
session=session,
|
||||
)
|
||||
|
||||
if cache_task != None:
|
||||
|
||||
@@ -8,8 +8,6 @@ import web3
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry import zero_address
|
||||
from cic_registry.chain import ChainSpec
|
||||
from hexathon import strip_0x
|
||||
from chainlib.status import Status as TxStatus
|
||||
|
||||
# platform imports
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
@@ -21,10 +19,6 @@ from cic_eth.eth.task import create_check_gas_and_send_task
|
||||
from cic_eth.eth.factory import TxFactory
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.ext.address import translate_address
|
||||
from cic_eth.task import (
|
||||
CriticalSQLAlchemyTask,
|
||||
CriticalWeb3Task,
|
||||
)
|
||||
|
||||
celery_app = celery.current_app
|
||||
logg = logging.getLogger()
|
||||
@@ -46,7 +40,6 @@ class TokenTxFactory(TxFactory):
|
||||
spender_address,
|
||||
amount,
|
||||
chain_spec,
|
||||
session=None,
|
||||
):
|
||||
"""Create an ERC20 "approve" transaction
|
||||
|
||||
@@ -74,7 +67,7 @@ class TokenTxFactory(TxFactory):
|
||||
'gas': source_token_gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'nonce': self.next_nonce(),
|
||||
})
|
||||
return tx_approve
|
||||
|
||||
@@ -85,7 +78,6 @@ class TokenTxFactory(TxFactory):
|
||||
receiver_address,
|
||||
value,
|
||||
chain_spec,
|
||||
session=None,
|
||||
):
|
||||
"""Create an ERC20 "transfer" transaction
|
||||
|
||||
@@ -114,7 +106,7 @@ class TokenTxFactory(TxFactory):
|
||||
'gas': source_token_gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'nonce': self.next_nonce(),
|
||||
})
|
||||
return tx_transfer
|
||||
|
||||
@@ -128,12 +120,11 @@ def unpack_transfer(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
f = data[2:10]
|
||||
if f != contract_function_signatures['transfer']:
|
||||
raise ValueError('Invalid transfer data ({})'.format(f))
|
||||
|
||||
d = data[8:]
|
||||
d = data[10:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
'amount': int(d[64:], 16)
|
||||
@@ -149,12 +140,11 @@ def unpack_transferfrom(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
f = data[2:10]
|
||||
if f != contract_function_signatures['transferfrom']:
|
||||
raise ValueError('Invalid transferFrom data ({})'.format(f))
|
||||
|
||||
d = data[8:]
|
||||
d = data[10:]
|
||||
return {
|
||||
'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
|
||||
@@ -171,19 +161,18 @@ def unpack_approve(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
f = data[2:10]
|
||||
if f != contract_function_signatures['approve']:
|
||||
raise ValueError('Invalid approval data ({})'.format(f))
|
||||
|
||||
d = data[8:]
|
||||
d = data[10:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
'amount': int(d[64:], 16)
|
||||
}
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalWeb3Task)
|
||||
@celery_app.task()
|
||||
def balance(tokens, holder_address, chain_str):
|
||||
"""Return token balances for a list of tokens for given address
|
||||
|
||||
@@ -210,7 +199,7 @@ def balance(tokens, holder_address, chain_str):
|
||||
return tokens
|
||||
|
||||
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True)
|
||||
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||
"""Transfer ERC20 tokens between addresses
|
||||
|
||||
@@ -246,11 +235,9 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||
c = RpcClient(chain_spec, holder_address=holder_address)
|
||||
|
||||
txf = TokenTxFactory(holder_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session)
|
||||
session.close()
|
||||
|
||||
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer')
|
||||
|
||||
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
|
||||
|
||||
@@ -266,7 +253,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||
return tx_hash_hex
|
||||
|
||||
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True)
|
||||
def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
||||
"""Approve ERC20 transfer on behalf of holder address
|
||||
|
||||
@@ -303,10 +290,8 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
||||
|
||||
txf = TokenTxFactory(holder_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session)
|
||||
session.close()
|
||||
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve')
|
||||
|
||||
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
|
||||
|
||||
@@ -322,7 +307,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
||||
return tx_hash_hex
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalWeb3Task)
|
||||
@celery_app.task()
|
||||
def resolve_tokens_by_symbol(token_symbols, chain_str):
|
||||
"""Returns contract addresses of an array of ERC20 token symbols
|
||||
|
||||
@@ -345,7 +330,7 @@ def resolve_tokens_by_symbol(token_symbols, chain_str):
|
||||
return tokens
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def otx_cache_transfer(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
@@ -369,7 +354,7 @@ def otx_cache_transfer(
|
||||
return txc
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def cache_transfer_data(
|
||||
tx_hash_hex,
|
||||
tx,
|
||||
@@ -405,7 +390,7 @@ def cache_transfer_data(
|
||||
return (tx_hash_hex, cache_id)
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def otx_cache_approve(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
@@ -429,7 +414,7 @@ def otx_cache_approve(
|
||||
return txc
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def cache_approve_data(
|
||||
tx_hash_hex,
|
||||
tx,
|
||||
@@ -465,7 +450,6 @@ def cache_approve_data(
|
||||
return (tx_hash_hex, cache_id)
|
||||
|
||||
|
||||
# TODO: Move to dedicated metadata package
|
||||
class ExtendedTx:
|
||||
|
||||
_default_decimals = 6
|
||||
@@ -486,8 +470,6 @@ class ExtendedTx:
|
||||
self.destination_token_symbol = ''
|
||||
self.source_token_decimals = ExtendedTx._default_decimals
|
||||
self.destination_token_decimals = ExtendedTx._default_decimals
|
||||
self.status = TxStatus.PENDING.name
|
||||
self.status_code = TxStatus.PENDING.value
|
||||
|
||||
|
||||
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
|
||||
@@ -515,18 +497,10 @@ class ExtendedTx:
|
||||
self.destination_token_value = destination_value
|
||||
|
||||
|
||||
def set_status(self, n):
|
||||
if n:
|
||||
self.status = TxStatus.ERROR.name
|
||||
else:
|
||||
self.status = TxStatus.SUCCESS.name
|
||||
self.status_code = n
|
||||
|
||||
|
||||
def to_dict(self):
|
||||
o = {}
|
||||
for attr in dir(self):
|
||||
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
|
||||
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'to_dict']:
|
||||
continue
|
||||
o[attr] = getattr(self, attr)
|
||||
return o
|
||||
|
||||
@@ -32,10 +32,6 @@ from cic_eth.eth.nonce import NonceOracle
|
||||
from cic_eth.error import AlreadyFillingGasError
|
||||
from cic_eth.eth.util import tx_hex_string
|
||||
from cic_eth.admin.ctrl import lock_send
|
||||
from cic_eth.task import (
|
||||
CriticalSQLAlchemyTask,
|
||||
CriticalWeb3Task,
|
||||
)
|
||||
|
||||
celery_app = celery.current_app
|
||||
logg = logging.getLogger()
|
||||
@@ -44,7 +40,7 @@ MAX_NONCE_ATTEMPTS = 3
|
||||
|
||||
|
||||
# TODO this function is too long
|
||||
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True, throws=(OutOfGasError))
|
||||
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
|
||||
"""Check the gas level of the sender address of a transaction.
|
||||
|
||||
@@ -135,7 +131,7 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
|
||||
|
||||
|
||||
# TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead.
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True)
|
||||
def hashes_to_txs(self, tx_hashes):
|
||||
"""Return a list of raw signed transactions from the local transaction queue corresponding to a list of transaction hashes.
|
||||
|
||||
@@ -317,8 +313,7 @@ class ParityNodeHandler:
|
||||
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
|
||||
|
||||
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
|
||||
@celery_app.task(bind=True, base=CriticalWeb3Task)
|
||||
@celery_app.task(bind=True)
|
||||
def send(self, txs, chain_str):
|
||||
"""Send transactions to the network.
|
||||
|
||||
@@ -356,6 +351,13 @@ def send(self, txs, chain_str):
|
||||
|
||||
c = RpcClient(chain_spec)
|
||||
r = None
|
||||
try:
|
||||
r = c.w3.eth.send_raw_transaction(tx_hex)
|
||||
except Exception as e:
|
||||
raiser = ParityNodeHandler(chain_spec, queue)
|
||||
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
|
||||
raise e(m)
|
||||
|
||||
s_set_sent = celery.signature(
|
||||
'cic_eth.queue.tx.set_sent_status',
|
||||
[
|
||||
@@ -364,14 +366,6 @@ def send(self, txs, chain_str):
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
try:
|
||||
r = c.w3.eth.send_raw_transaction(tx_hex)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
raise(e)
|
||||
except Exception as e:
|
||||
raiser = ParityNodeHandler(chain_spec, queue)
|
||||
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
|
||||
raise e(m)
|
||||
s_set_sent.apply_async()
|
||||
|
||||
tx_tail = txs[1:]
|
||||
@@ -386,9 +380,7 @@ def send(self, txs, chain_str):
|
||||
return r.hex()
|
||||
|
||||
|
||||
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
|
||||
# TODO: method is too long, factor out code for clarity
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
|
||||
@celery_app.task(bind=True, throws=(AlreadyFillingGasError))
|
||||
def refill_gas(self, recipient_address, chain_str):
|
||||
"""Executes a native token transaction to fund the recipient's gas expenditures.
|
||||
|
||||
@@ -407,11 +399,11 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
q = session.query(Otx.tx_hash)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
|
||||
q = q.filter(TxCache.from_value!=0)
|
||||
q = q.filter(TxCache.from_value!='0x00')
|
||||
q = q.filter(TxCache.recipient==recipient_address)
|
||||
c = q.count()
|
||||
session.close()
|
||||
if c > 0:
|
||||
session.close()
|
||||
raise AlreadyFillingGasError(recipient_address)
|
||||
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
@@ -421,7 +413,7 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
|
||||
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
|
||||
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
|
||||
nonce = nonce_generator.next(session=session)
|
||||
nonce = nonce_generator.next()
|
||||
gas_price = c.gas_price()
|
||||
gas_limit = c.default_gas_limit
|
||||
refill_amount = c.refill_amount()
|
||||
@@ -450,9 +442,7 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
tx_hash_hex,
|
||||
tx_send_gas_signed['raw'],
|
||||
chain_str,
|
||||
session=session,
|
||||
)
|
||||
session.close()
|
||||
|
||||
s_tx_cache = celery.signature(
|
||||
'cic_eth.eth.tx.cache_gas_refill_data',
|
||||
@@ -495,8 +485,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==txold_hash_hex)
|
||||
otx = q.first()
|
||||
session.close()
|
||||
if otx == None:
|
||||
session.close()
|
||||
raise NotLocalTxError(txold_hash_hex)
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
@@ -531,10 +521,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
chain_str,
|
||||
session=session,
|
||||
)
|
||||
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
|
||||
session.close()
|
||||
TxCache.clone(txold_hash_hex, tx_hash_hex)
|
||||
|
||||
s = create_check_gas_and_send_task(
|
||||
[tx_signed_raw_hex],
|
||||
@@ -549,15 +537,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
return tx_hash_hex
|
||||
|
||||
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,))
|
||||
def sync_tx(self, tx_hash_hex, chain_str):
|
||||
"""Force update of network status of a simgle transaction
|
||||
|
||||
:param tx_hash_hex: Transaction hash
|
||||
:type tx_hash_hex: str, 0x-hex
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
"""
|
||||
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
|
||||
@@ -640,7 +621,7 @@ def resume_tx(self, txpending_hash_hex, chain_str):
|
||||
return txpending_hash_hex
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def otx_cache_parse_tx(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
@@ -667,7 +648,7 @@ def otx_cache_parse_tx(
|
||||
return txc
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def cache_gas_refill_data(
|
||||
tx_hash_hex,
|
||||
tx,
|
||||
|
||||
@@ -14,7 +14,6 @@ from cic_eth.db.enum import (
|
||||
StatusBits,
|
||||
dead,
|
||||
)
|
||||
from cic_eth.task import CriticalSQLAlchemyTask
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
@@ -36,7 +35,7 @@ def __balance_outgoing_compatible(token_address, holder_address, chain_str):
|
||||
return delta
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def balance_outgoing(tokens, holder_address, chain_str):
|
||||
"""Retrieve accumulated value of unprocessed transactions sent from the given address.
|
||||
|
||||
@@ -74,7 +73,7 @@ def __balance_incoming_compatible(token_address, receiver_address, chain_str):
|
||||
return delta
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def balance_incoming(tokens, receipient_address, chain_str):
|
||||
"""Retrieve accumulated value of unprocessed transactions to be received by the given address.
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ from cic_registry.chain import ChainSpec
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.error import NotLocalTxError
|
||||
from cic_eth.task import CriticalSQLAlchemyAndWeb3Task
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
@@ -18,7 +17,7 @@ logg = logging.getLogger()
|
||||
|
||||
|
||||
# TODO: This method does not belong in the _queue_ module, it operates across queue and network
|
||||
@celery_app.task(base=CriticalSQLAlchemyAndWeb3Task)
|
||||
@celery_app.task()
|
||||
def tx_times(tx_hash, chain_str):
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
c = RpcClient(chain_spec)
|
||||
|
||||
@@ -26,7 +26,6 @@ from cic_eth.db.enum import (
|
||||
is_alive,
|
||||
dead,
|
||||
)
|
||||
from cic_eth.task import CriticalSQLAlchemyTask
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
|
||||
from cic_eth.error import NotLocalTxError
|
||||
from cic_eth.error import LockedError
|
||||
@@ -87,7 +86,7 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
|
||||
|
||||
|
||||
# TODO: Replace set_* with single task for set status
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def set_sent_status(tx_hash, fail=False):
|
||||
"""Used to set the status after a send attempt
|
||||
|
||||
@@ -119,7 +118,7 @@ def set_sent_status(tx_hash, fail=False):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def set_final_status(tx_hash, block=None, fail=False):
|
||||
"""Used to set the status of an incoming transaction result.
|
||||
|
||||
@@ -175,7 +174,7 @@ def set_final_status(tx_hash, block=None, fail=False):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def set_cancel(tx_hash, manual=False):
|
||||
"""Used to set the status when a transaction is cancelled.
|
||||
|
||||
@@ -207,7 +206,7 @@ def set_cancel(tx_hash, manual=False):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def set_rejected(tx_hash):
|
||||
"""Used to set the status when the node rejects sending a transaction to network
|
||||
|
||||
@@ -233,7 +232,7 @@ def set_rejected(tx_hash):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def set_fubar(tx_hash):
|
||||
"""Used to set the status when an unexpected error occurs.
|
||||
|
||||
@@ -259,7 +258,7 @@ def set_fubar(tx_hash):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def set_manual(tx_hash):
|
||||
"""Used to set the status when queue is manually changed
|
||||
|
||||
@@ -285,7 +284,7 @@ def set_manual(tx_hash):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def set_ready(tx_hash):
|
||||
"""Used to mark a transaction as ready to be sent to network
|
||||
|
||||
@@ -311,25 +310,7 @@ def set_ready(tx_hash):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_dequeue(tx_hash):
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
if o == None:
|
||||
session.close()
|
||||
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||
|
||||
session.flush()
|
||||
|
||||
o.dequeue(session=session)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
return tx_hash
|
||||
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def set_waitforgas(tx_hash):
|
||||
"""Used to set the status when a transaction must be deferred due to gas refill
|
||||
|
||||
@@ -355,7 +336,7 @@ def set_waitforgas(tx_hash):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def get_state_log(tx_hash):
|
||||
|
||||
logs = []
|
||||
@@ -374,7 +355,7 @@ def get_state_log(tx_hash):
|
||||
return logs
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def get_tx_cache(tx_hash):
|
||||
"""Returns an aggregate dictionary of outgoing transaction data and metadata
|
||||
|
||||
@@ -423,7 +404,7 @@ def get_tx_cache(tx_hash):
|
||||
return tx
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def get_lock(address=None):
|
||||
"""Retrieve all active locks
|
||||
|
||||
@@ -461,7 +442,7 @@ def get_lock(address=None):
|
||||
return locks
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def get_tx(tx_hash):
|
||||
"""Retrieve a transaction queue record by transaction hash
|
||||
|
||||
@@ -472,9 +453,7 @@ def get_tx(tx_hash):
|
||||
:rtype: dict
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
tx = q.first()
|
||||
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
if tx == None:
|
||||
session.close()
|
||||
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||
@@ -490,7 +469,7 @@ def get_tx(tx_hash):
|
||||
return o
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def get_nonce_tx(nonce, sender, chain_id):
|
||||
"""Retrieve all transactions for address with specified nonce
|
||||
|
||||
@@ -673,7 +652,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
||||
return txs
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task()
|
||||
def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None):
|
||||
"""Returns all local queue transactions for a given Ethereum address
|
||||
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import copy
|
||||
|
||||
# external imports
|
||||
from cic_registry import CICRegistry
|
||||
from eth_token_index import TokenUniqueSymbolIndex
|
||||
from eth_accounts_index import AccountRegistry
|
||||
from chainlib.chain import ChainSpec
|
||||
from cic_registry.chain import ChainRegistry
|
||||
from cic_registry.helper.declarator import DeclaratorOracleAdapter
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TokenOracle:
|
||||
|
||||
def __init__(self, conn, chain_spec, registry):
|
||||
self.tokens = []
|
||||
self.chain_spec = chain_spec
|
||||
self.registry = registry
|
||||
|
||||
token_registry_contract = CICRegistry.get_contract(chain_spec, 'TokenRegistry', 'Registry')
|
||||
self.getter = TokenUniqueSymbolIndex(conn, token_registry_contract.address())
|
||||
|
||||
|
||||
def get_tokens(self):
|
||||
token_count = self.getter.count()
|
||||
if token_count == len(self.tokens):
|
||||
return self.tokens
|
||||
|
||||
for i in range(len(self.tokens), token_count):
|
||||
token_address = self.getter.get_index(i)
|
||||
t = self.registry.get_address(self.chain_spec, token_address)
|
||||
token_symbol = t.symbol()
|
||||
self.tokens.append(t)
|
||||
|
||||
logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address))
|
||||
|
||||
return copy.copy(self.tokens)
|
||||
|
||||
|
||||
class AccountsOracle:
|
||||
|
||||
def __init__(self, conn, chain_spec, registry):
|
||||
self.accounts = []
|
||||
self.chain_spec = chain_spec
|
||||
self.registry = registry
|
||||
|
||||
accounts_registry_contract = CICRegistry.get_contract(chain_spec, 'AccountRegistry', 'Registry')
|
||||
self.getter = AccountRegistry(conn, accounts_registry_contract.address())
|
||||
|
||||
|
||||
def get_accounts(self):
|
||||
accounts_count = self.getter.count()
|
||||
if accounts_count == len(self.accounts):
|
||||
return self.accounts
|
||||
|
||||
for i in range(len(self.accounts), accounts_count):
|
||||
account = self.getter.get_index(i)
|
||||
self.accounts.append(account)
|
||||
logg.debug('adding account {}'.format(account))
|
||||
|
||||
return copy.copy(self.accounts)
|
||||
|
||||
|
||||
def init_registry(config, w3):
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
|
||||
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
|
||||
|
||||
chain_registry = ChainRegistry(chain_spec)
|
||||
CICRegistry.add_chain_registry(chain_registry, True)
|
||||
|
||||
declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator')
|
||||
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
|
||||
if trusted_addresses_src == None:
|
||||
raise ValueError('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
|
||||
trusted_addresses = trusted_addresses_src.split(',')
|
||||
for address in trusted_addresses:
|
||||
logg.info('using trusted address {}'.format(address))
|
||||
|
||||
oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
|
||||
chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
|
||||
|
||||
return CICRegistry
|
||||
@@ -76,9 +76,8 @@ def main():
|
||||
t = api.create_account(register=register)
|
||||
|
||||
ps.get_message()
|
||||
o = ps.get_message(timeout=args.timeout)
|
||||
m = json.loads(o['data'])
|
||||
print(m['result'])
|
||||
m = ps.get_message(timeout=args.timeout)
|
||||
print(json.loads(m['data']))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -21,21 +21,14 @@ import cic_eth
|
||||
from cic_eth.eth import RpcClient
|
||||
from cic_eth.db import SessionBase
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db.enum import StatusBits
|
||||
from cic_eth.db.enum import LockEnum
|
||||
from cic_eth.db import dsn_from_config
|
||||
from cic_eth.queue.tx import (
|
||||
get_upcoming_tx,
|
||||
set_dequeue,
|
||||
)
|
||||
from cic_eth.queue.tx import get_upcoming_tx
|
||||
from cic_eth.admin.ctrl import lock_send
|
||||
from cic_eth.sync.error import LoopDone
|
||||
from cic_eth.eth.tx import send as task_tx_send
|
||||
from cic_eth.error import (
|
||||
PermanentTxError,
|
||||
TemporaryTxError,
|
||||
NotLocalTxError,
|
||||
)
|
||||
from cic_eth.error import PermanentTxError
|
||||
from cic_eth.error import TemporaryTxError
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx_hex
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
@@ -98,8 +91,6 @@ run = True
|
||||
|
||||
class DispatchSyncer:
|
||||
|
||||
yield_delay = 0.005
|
||||
|
||||
def __init__(self, chain_spec):
|
||||
self.chain_spec = chain_spec
|
||||
self.chain_id = chain_spec.chain_id()
|
||||
@@ -116,11 +107,6 @@ class DispatchSyncer:
|
||||
for k in txs.keys():
|
||||
tx_raw = txs[k]
|
||||
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
|
||||
|
||||
try:
|
||||
set_dequeue(tx['hash'])
|
||||
except NotLocalTxError as e:
|
||||
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
|
||||
|
||||
s_check = celery.signature(
|
||||
'cic_eth.admin.ctrl.check_lock',
|
||||
@@ -141,22 +127,18 @@ class DispatchSyncer:
|
||||
)
|
||||
s_check.link(s_send)
|
||||
t = s_check.apply_async()
|
||||
logg.info('processed {}'.format(k))
|
||||
|
||||
|
||||
def loop(self, w3, interval):
|
||||
while run:
|
||||
txs = {}
|
||||
typ = StatusBits.QUEUED
|
||||
typ = StatusEnum.READYSEND
|
||||
utxs = get_upcoming_tx(typ, chain_id=self.chain_id)
|
||||
for k in utxs.keys():
|
||||
txs[k] = utxs[k]
|
||||
self.process(w3, txs)
|
||||
|
||||
if len(utxs) > 0:
|
||||
time.sleep(self.yield_delay)
|
||||
else:
|
||||
time.sleep(interval)
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@@ -5,46 +5,37 @@ import logging
|
||||
import web3
|
||||
import celery
|
||||
from cic_registry.error import UnknownContractError
|
||||
from chainlib.status import Status as TxStatus
|
||||
from chainlib.eth.address import to_checksum
|
||||
from chainlib.eth.constant import ZERO_ADDRESS
|
||||
from hexathon import strip_0x
|
||||
|
||||
# local imports
|
||||
from .base import SyncFilter
|
||||
from cic_eth.eth.token import (
|
||||
unpack_transfer,
|
||||
unpack_transferfrom,
|
||||
)
|
||||
from cic_eth.eth.account import unpack_gift
|
||||
from cic_eth.eth.token import unpack_transfer
|
||||
from cic_eth.eth.token import unpack_transferfrom
|
||||
from cic_eth.eth.token import ExtendedTx
|
||||
from .base import SyncFilter
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
logg = logging.getLogger()
|
||||
|
||||
transfer_method_signature = 'a9059cbb' # keccak256(transfer(address,uint256))
|
||||
transferfrom_method_signature = '23b872dd' # keccak256(transferFrom(address,address,uint256))
|
||||
giveto_method_signature = '63e4bff4' # keccak256(giveTo(address))
|
||||
transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256))
|
||||
transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256))
|
||||
giveto_method_signature = '0x63e4bff4' # keccak256(giveTo(address))
|
||||
|
||||
|
||||
class CallbackFilter(SyncFilter):
|
||||
|
||||
trusted_addresses = []
|
||||
|
||||
def __init__(self, chain_spec, method, queue):
|
||||
def __init__(self, method, queue):
|
||||
self.queue = queue
|
||||
self.method = method
|
||||
self.chain_spec = chain_spec
|
||||
|
||||
|
||||
def call_back(self, transfer_type, result):
|
||||
logg.debug('result {}'.format(result))
|
||||
s = celery.signature(
|
||||
self.method,
|
||||
[
|
||||
result,
|
||||
transfer_type,
|
||||
int(result['status_code'] == 0),
|
||||
int(rcpt.status == 0),
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
@@ -59,85 +50,58 @@ class CallbackFilter(SyncFilter):
|
||||
# )
|
||||
# s_translate.link(s)
|
||||
# s_translate.apply_async()
|
||||
t = s.apply_async()
|
||||
return s
|
||||
s.apply_async()
|
||||
|
||||
|
||||
def parse_data(self, tx):
|
||||
transfer_type = None
|
||||
def parse_data(self, tx, rcpt):
|
||||
transfer_type = 'transfer'
|
||||
transfer_data = None
|
||||
logg.debug('have payload {}'.format(tx.payload))
|
||||
method_signature = tx.payload[:8]
|
||||
method_signature = tx.input[:10]
|
||||
|
||||
logg.debug('tx status {}'.format(tx.status))
|
||||
if method_signature == transfer_method_signature:
|
||||
transfer_data = unpack_transfer(tx.payload)
|
||||
transfer_data = unpack_transfer(tx.input)
|
||||
transfer_data['from'] = tx['from']
|
||||
transfer_data['token_address'] = tx['to']
|
||||
|
||||
elif method_signature == transferfrom_method_signature:
|
||||
transfer_type = 'transferfrom'
|
||||
transfer_data = unpack_transferfrom(tx.payload)
|
||||
transfer_data = unpack_transferfrom(tx.input)
|
||||
transfer_data['token_address'] = tx['to']
|
||||
|
||||
# TODO: do not rely on logs here
|
||||
elif method_signature == giveto_method_signature:
|
||||
transfer_type = 'tokengift'
|
||||
transfer_data = unpack_gift(tx.payload)
|
||||
transfer_data['from'] = tx.inputs[0]
|
||||
transfer_data['value'] = 0
|
||||
transfer_data['token_address'] = ZERO_ADDRESS
|
||||
for l in tx.logs:
|
||||
topics = l['topics']
|
||||
logg.debug('topixx {}'.format(topics))
|
||||
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
|
||||
transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
|
||||
#token_address_bytes = topics[2][32-20:]
|
||||
token_address = strip_0x(topics[2])[64-40:]
|
||||
transfer_data['token_address'] = to_checksum(token_address)
|
||||
|
||||
logg.debug('resolved method {}'.format(transfer_type))
|
||||
|
||||
if transfer_data != None:
|
||||
transfer_data['status'] = tx.status
|
||||
transfer_data = unpack_gift(tx.input)
|
||||
for l in rcpt.logs:
|
||||
if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
|
||||
transfer_data['value'] = web3.Web3.toInt(hexstr=l.data)
|
||||
token_address_bytes = l.topics[2][32-20:]
|
||||
transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex())
|
||||
transfer_data['from'] = rcpt.to
|
||||
|
||||
return (transfer_type, transfer_data)
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
chain_str = str(self.chain_spec)
|
||||
def filter(self, w3, tx, rcpt, chain_spec, session=None):
|
||||
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
|
||||
chain_str = str(chain_spec)
|
||||
|
||||
transfer_data = self.parse_data(tx, rcpt)
|
||||
|
||||
transfer_data = None
|
||||
transfer_type = None
|
||||
try:
|
||||
(transfer_type, transfer_data) = self.parse_data(tx)
|
||||
except TypeError:
|
||||
logg.debug('invalid method data length for tx {}'.format(tx.hash))
|
||||
if len(tx.input) < 10:
|
||||
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx['hash']))
|
||||
return
|
||||
|
||||
if len(tx.payload) < 8:
|
||||
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx.hash))
|
||||
return
|
||||
|
||||
logg.debug('checking callbacks filter input {}'.format(tx.payload[:8]))
|
||||
logg.debug('checking callbacks filter input {}'.format(tx.input[:10]))
|
||||
|
||||
if transfer_data != None:
|
||||
logg.debug('wtfoo {}'.format(transfer_data))
|
||||
token_symbol = None
|
||||
result = None
|
||||
try:
|
||||
tokentx = ExtendedTx(tx.hash, self.chain_spec)
|
||||
tokentx = ExtendedTx(self.chain_spec)
|
||||
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses)
|
||||
tokentx.set_tokens(transfer_data['token_address'], transfer_data['value'])
|
||||
if transfer_data['status'] == 0:
|
||||
tokentx.set_status(1)
|
||||
else:
|
||||
tokentx.set_status(0)
|
||||
t = self.call_back(transfer_type, tokentx.to_dict())
|
||||
logg.info('callback success task id {} tx {}'.format(t, tx.hash))
|
||||
self.call_back(tokentx.to_dict())
|
||||
except UnknownContractError:
|
||||
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash))
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return 'cic-eth callbacks'
|
||||
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash.hex()))
|
||||
|
||||
@@ -3,30 +3,29 @@ import logging
|
||||
|
||||
# third-party imports
|
||||
from cic_registry.chain import ChainSpec
|
||||
from hexathon import add_0x
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.enum import StatusBits
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db import Otx
|
||||
from cic_eth.queue.tx import get_paused_txs
|
||||
from cic_eth.eth.task import create_check_gas_and_send_task
|
||||
from .base import SyncFilter
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class GasFilter(SyncFilter):
|
||||
|
||||
def __init__(self, chain_spec, queue=None):
|
||||
def __init__(self, gas_provider, queue=None):
|
||||
self.queue = queue
|
||||
self.chain_spec = chain_spec
|
||||
self.gas_provider = gas_provider
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
|
||||
tx_hash_hex = add_0x(tx.hash)
|
||||
if tx.value > 0:
|
||||
def filter(self, w3, tx, rcpt, chain_str, session=None):
|
||||
logg.debug('applying gas filter')
|
||||
tx_hash_hex = tx.hash.hex()
|
||||
if tx['value'] > 0:
|
||||
logg.debug('gas refill tx {}'.format(tx_hash_hex))
|
||||
session = SessionBase.bind_session(session)
|
||||
q = session.query(TxCache.recipient)
|
||||
@@ -35,26 +34,23 @@ class GasFilter(SyncFilter):
|
||||
r = q.first()
|
||||
|
||||
if r == None:
|
||||
logg.debug('unsolicited gas refill tx {}'.format(tx_hash_hex))
|
||||
logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex))
|
||||
SessionBase.release_session(session)
|
||||
return
|
||||
|
||||
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session)
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id(), session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
logg.info('resuming gas-in-waiting txs for {}'.format(r[0]))
|
||||
if len(txs) > 0:
|
||||
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
|
||||
s = create_check_gas_and_send_task(
|
||||
list(txs.values()),
|
||||
str(self.chain_spec),
|
||||
str(chain_str),
|
||||
r[0],
|
||||
0,
|
||||
tx_hashes_hex=list(txs.keys()),
|
||||
queue=self.queue,
|
||||
)
|
||||
s.apply_async()
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return 'eic-eth gasfilter'
|
||||
|
||||
@@ -4,48 +4,32 @@ import logging
|
||||
# third-party imports
|
||||
import celery
|
||||
from chainlib.eth.address import to_checksum
|
||||
from hexathon import (
|
||||
add_0x,
|
||||
strip_0x,
|
||||
)
|
||||
|
||||
# local imports
|
||||
from .base import SyncFilter
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
logg = logging.getLogger()
|
||||
|
||||
account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256))
|
||||
|
||||
|
||||
class RegistrationFilter(SyncFilter):
|
||||
|
||||
def __init__(self, chain_spec, queue):
|
||||
self.chain_spec = chain_spec
|
||||
self.queue = queue
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
def filter(self, w3, tx, rcpt, chain_spec, session=None):
|
||||
logg.debug('applying registration filter')
|
||||
registered_address = None
|
||||
logg.debug('register filter checking log {}'.format(tx.logs))
|
||||
for l in tx.logs:
|
||||
event_topic_hex = l['topics'][0]
|
||||
for l in rcpt['logs']:
|
||||
event_topic_hex = l['topics'][0].hex()
|
||||
if event_topic_hex == account_registry_add_log_hash:
|
||||
# TODO: use abi conversion method instead
|
||||
|
||||
address_hex = strip_0x(l['topics'][1])[64-40:]
|
||||
address = to_checksum(add_0x(address_hex))
|
||||
address_bytes = l.topics[1][32-20:]
|
||||
address = to_checksum(address_bytes.hex())
|
||||
logg.debug('request token gift to {}'.format(address))
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.account.gift',
|
||||
[
|
||||
address,
|
||||
str(self.chain_spec),
|
||||
str(chain_spec),
|
||||
],
|
||||
queue=self.queue,
|
||||
queue=queue,
|
||||
)
|
||||
s.apply_async()
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return 'cic-eth account registration'
|
||||
|
||||
|
||||
@@ -3,47 +3,39 @@ import logging
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
from hexathon import (
|
||||
add_0x,
|
||||
)
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
from chainlib.status import Status
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from .base import SyncFilter
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class TxFilter(SyncFilter):
|
||||
|
||||
def __init__(self, chain_spec, queue):
|
||||
def __init__(self, queue):
|
||||
self.queue = queue
|
||||
self.chain_spec = chain_spec
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
db_session = SessionBase.bind_session(db_session)
|
||||
tx_hash_hex = tx.hash
|
||||
otx = Otx.load(add_0x(tx_hash_hex), session=db_session)
|
||||
def filter(self, w3, tx, rcpt, chain_spec, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
logg.debug('applying tx filter')
|
||||
tx_hash_hex = tx.hash.hex()
|
||||
otx = Otx.load(tx_hash_hex, session=session)
|
||||
SessionBase.release_session(session)
|
||||
if otx == None:
|
||||
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
|
||||
return None
|
||||
logg.info('local tx match {}'.format(otx.tx_hash))
|
||||
SessionBase.release_session(db_session)
|
||||
logg.info('otx found {}'.format(otx.tx_hash))
|
||||
s = celery.signature(
|
||||
'cic_eth.queue.tx.set_final_status',
|
||||
[
|
||||
add_0x(tx_hash_hex),
|
||||
tx.block.number,
|
||||
tx.status == Status.ERROR,
|
||||
tx_hash_hex,
|
||||
rcpt.blockNumber,
|
||||
rcpt.status == 0,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
t = s.apply_async()
|
||||
return t
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return 'cic-eth erc20 transfer filter'
|
||||
|
||||
207
apps/cic-eth/cic_eth/runnable/daemons/manager.py
Normal file
207
apps/cic-eth/cic_eth/runnable/daemons/manager.py
Normal file
@@ -0,0 +1,207 @@
|
||||
# standard imports
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import time
|
||||
import argparse
|
||||
import sys
|
||||
import re
|
||||
|
||||
# third-party imports
|
||||
import confini
|
||||
import celery
|
||||
import rlp
|
||||
import web3
|
||||
from web3 import HTTPProvider, WebsocketProvider
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry.chain import ChainSpec
|
||||
from cic_registry import zero_address
|
||||
from cic_registry.chain import ChainRegistry
|
||||
from cic_registry.error import UnknownContractError
|
||||
from cic_bancor.bancor import BancorRegistryClient
|
||||
|
||||
# local imports
|
||||
import cic_eth
|
||||
from cic_eth.eth import RpcClient
|
||||
from cic_eth.db import SessionBase
|
||||
from cic_eth.db import Otx
|
||||
from cic_eth.db import TxConvertTransfer
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db import dsn_from_config
|
||||
from cic_eth.queue.tx import get_paused_txs
|
||||
from cic_eth.sync import Syncer
|
||||
from cic_eth.sync.error import LoopDone
|
||||
from cic_eth.db.error import UnknownConvertError
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.eth.task import create_check_gas_and_send_task
|
||||
from cic_eth.sync.backend import SyncerBackend
|
||||
from cic_eth.eth.token import unpack_transfer
|
||||
from cic_eth.eth.token import unpack_transferfrom
|
||||
from cic_eth.eth.account import unpack_gift
|
||||
from cic_eth.runnable.daemons.filters import (
|
||||
CallbackFilter,
|
||||
GasFilter,
|
||||
TxFilter,
|
||||
RegistrationFilter,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
|
||||
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-eth')
|
||||
|
||||
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
|
||||
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
|
||||
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
|
||||
argparser.add_argument('-v', help='be verbose', action='store_true')
|
||||
argparser.add_argument('-vv', help='be more verbose', action='store_true')
|
||||
argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head')
|
||||
args = argparser.parse_args(sys.argv[1:])
|
||||
|
||||
if args.v == True:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
elif args.vv == True:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
config_dir = os.path.join(args.c)
|
||||
os.makedirs(config_dir, 0o777, True)
|
||||
config = confini.Config(config_dir, args.env_prefix)
|
||||
config.process()
|
||||
# override args
|
||||
args_override = {
|
||||
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
|
||||
'CIC_CHAIN_SPEC': getattr(args, 'i'),
|
||||
}
|
||||
config.dict_override(args_override, 'cli flag')
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
||||
|
||||
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
|
||||
|
||||
queue = args.q
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
|
||||
|
||||
re_websocket = re.compile('^wss?://')
|
||||
re_http = re.compile('^https?://')
|
||||
blockchain_provider = config.get('ETH_PROVIDER')
|
||||
if re.match(re_websocket, blockchain_provider) != None:
|
||||
blockchain_provider = WebsocketProvider(blockchain_provider)
|
||||
elif re.match(re_http, blockchain_provider) != None:
|
||||
blockchain_provider = HTTPProvider(blockchain_provider)
|
||||
else:
|
||||
raise ValueError('unknown provider url {}'.format(blockchain_provider))
|
||||
|
||||
def web3_constructor():
|
||||
w3 = web3.Web3(blockchain_provider)
|
||||
return (blockchain_provider, w3)
|
||||
RpcClient.set_constructor(web3_constructor)
|
||||
|
||||
c = RpcClient(chain_spec)
|
||||
CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
|
||||
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
|
||||
chain_registry = ChainRegistry(chain_spec)
|
||||
CICRegistry.add_chain_registry(chain_registry, True)
|
||||
|
||||
declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator')
|
||||
|
||||
|
||||
dsn = dsn_from_config(config)
|
||||
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
|
||||
|
||||
|
||||
def main():
|
||||
global chain_spec, c, queue
|
||||
|
||||
if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None:
|
||||
CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True)
|
||||
|
||||
syncers = []
|
||||
block_offset = c.w3.eth.blockNumber
|
||||
chain = str(chain_spec)
|
||||
|
||||
if SyncerBackend.first(chain):
|
||||
from cic_eth.sync.history import HistorySyncer
|
||||
backend = SyncerBackend.initial(chain, block_offset)
|
||||
syncer = HistorySyncer(backend)
|
||||
syncers.append(syncer)
|
||||
|
||||
if args.mode == 'head':
|
||||
from cic_eth.sync.head import HeadSyncer
|
||||
block_sync = SyncerBackend.live(chain, block_offset+1)
|
||||
syncers.append(HeadSyncer(block_sync))
|
||||
elif args.mode == 'history':
|
||||
from cic_eth.sync.history import HistorySyncer
|
||||
backends = SyncerBackend.resume(chain, block_offset+1)
|
||||
for backend in backends:
|
||||
syncers.append(HistorySyncer(backend))
|
||||
if len(syncers) == 0:
|
||||
logg.info('found no unsynced history. terminating')
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.stderr.write("unknown mode '{}'\n".format(args.mode))
|
||||
sys.exit(1)
|
||||
|
||||
# bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry')
|
||||
# bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec)
|
||||
# bancor_registry = BancorRegistryClient(c.w3, bancor_chain_registry, config.get('ETH_ABI_DIR'))
|
||||
# bancor_registry.load()
|
||||
|
||||
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
|
||||
if trusted_addresses_src == None:
|
||||
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
|
||||
sys.exit(1)
|
||||
trusted_addresses = trusted_addresses_src.split(',')
|
||||
for address in trusted_addresses:
|
||||
logg.info('using trusted address {}'.format(address))
|
||||
CallbackFilter.trusted_addresses = trusted_addresses
|
||||
|
||||
callback_filters = []
|
||||
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
|
||||
task_split = cb.split(':')
|
||||
task_queue = queue
|
||||
if len(task_split) > 1:
|
||||
task_queue = task_split[0]
|
||||
callback_filter = CallbackFilter(task_split[1], task_queue)
|
||||
callback_filters.append(callback_filter)
|
||||
|
||||
tx_filter = TxFilter(queue)
|
||||
|
||||
registration_filter = RegistrationFilter()
|
||||
|
||||
gas_filter = GasFilter(c.gas_provider(), queue)
|
||||
|
||||
i = 0
|
||||
for syncer in syncers:
|
||||
logg.debug('running syncer index {}'.format(i))
|
||||
syncer.filter.append(gas_filter.filter)
|
||||
syncer.filter.append(registration_filter.filter)
|
||||
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
|
||||
syncer.filter.append(tx_filter.filter)
|
||||
#syncer.filter.append(convert_filter)
|
||||
for cf in callback_filters:
|
||||
syncer.filter.append(cf.filter)
|
||||
|
||||
try:
|
||||
syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')))
|
||||
except LoopDone as e:
|
||||
sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e))
|
||||
|
||||
i += 1
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -32,7 +32,6 @@ from cic_eth.admin import ctrl
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
from cic_eth.eth.rpc import GasOracle
|
||||
from cic_eth.queue import tx
|
||||
from cic_eth.queue import balance
|
||||
from cic_eth.callbacks import Callback
|
||||
from cic_eth.callbacks import http
|
||||
from cic_eth.callbacks import tcp
|
||||
@@ -50,7 +49,6 @@ argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-p', '--provider', dest='p', type=str, help='web3 provider')
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
|
||||
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
|
||||
argparser.add_argument('-r', type=str, help='CIC registry address')
|
||||
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
|
||||
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
|
||||
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
|
||||
@@ -70,7 +68,6 @@ config.process()
|
||||
args_override = {
|
||||
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
|
||||
'CIC_CHAIN_SPEC': getattr(args, 'i'),
|
||||
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
|
||||
'ETH_PROVIDER': getattr(args, 'p'),
|
||||
'TASKS_TRACE_QUEUE_STATUS': getattr(args, 'trace_queue_status'),
|
||||
}
|
||||
@@ -231,7 +228,7 @@ def main():
|
||||
for address in trusted_addresses:
|
||||
logg.info('using trusted address {}'.format(address))
|
||||
oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
|
||||
chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
|
||||
chain_registry.add_oracle('naive_erc20_oracle', oracle)
|
||||
|
||||
|
||||
#chain_spec = CICRegistry.default_chain_spec
|
||||
|
||||
@@ -1,168 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import time
|
||||
import argparse
|
||||
import sys
|
||||
import re
|
||||
|
||||
# third-party imports
|
||||
import confini
|
||||
import celery
|
||||
import rlp
|
||||
import web3
|
||||
from web3 import HTTPProvider, WebsocketProvider
|
||||
import cic_base.config
|
||||
import cic_base.log
|
||||
import cic_base.argparse
|
||||
import cic_base.rpc
|
||||
from cic_registry import CICRegistry
|
||||
from chainlib.chain import ChainSpec
|
||||
from cic_registry import zero_address
|
||||
from cic_registry.chain import ChainRegistry
|
||||
from cic_registry.error import UnknownContractError
|
||||
from chainlib.eth.connection import HTTPConnection
|
||||
from chainlib.eth.block import (
|
||||
block_latest,
|
||||
)
|
||||
from hexathon import (
|
||||
strip_0x,
|
||||
)
|
||||
from chainsyncer.backend import SyncerBackend
|
||||
from chainsyncer.driver import (
|
||||
HeadSyncer,
|
||||
HistorySyncer,
|
||||
)
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
|
||||
# local imports
|
||||
from cic_eth.registry import init_registry
|
||||
from cic_eth.eth import RpcClient
|
||||
from cic_eth.db import Otx
|
||||
from cic_eth.db import TxConvertTransfer
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db import dsn_from_config
|
||||
from cic_eth.queue.tx import get_paused_txs
|
||||
#from cic_eth.sync import Syncer
|
||||
#from cic_eth.sync.error import LoopDone
|
||||
from cic_eth.db.error import UnknownConvertError
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.eth.task import create_check_gas_and_send_task
|
||||
from cic_eth.eth.token import unpack_transfer
|
||||
from cic_eth.eth.token import unpack_transferfrom
|
||||
from cic_eth.eth.account import unpack_gift
|
||||
from cic_eth.runnable.daemons.filters import (
|
||||
CallbackFilter,
|
||||
GasFilter,
|
||||
TxFilter,
|
||||
RegistrationFilter,
|
||||
)
|
||||
|
||||
script_dir = os.path.realpath(os.path.dirname(__file__))
|
||||
|
||||
logg = cic_base.log.create()
|
||||
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
|
||||
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
|
||||
args = cic_base.argparse.parse(argparser, logg)
|
||||
config = cic_base.config.create(args.c, args, args.env_prefix)
|
||||
|
||||
config.add(args.y, '_KEYSTORE_FILE', True)
|
||||
|
||||
config.add(args.q, '_CELERY_QUEUE', True)
|
||||
|
||||
cic_base.config.log(config)
|
||||
|
||||
|
||||
dsn = dsn_from_config(config)
|
||||
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
|
||||
|
||||
|
||||
def main():
|
||||
# parse chain spec object
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
|
||||
# connect to celery
|
||||
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
|
||||
|
||||
# set up registry
|
||||
w3 = cic_base.rpc.create(config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored
|
||||
registry = init_registry(config, w3)
|
||||
|
||||
# Connect to blockchain with chainlib
|
||||
conn = HTTPConnection(config.get('ETH_PROVIDER'))
|
||||
|
||||
o = block_latest()
|
||||
r = conn.do(o)
|
||||
block_offset = int(strip_0x(r), 16) + 1
|
||||
|
||||
logg.debug('starting at block {}'.format(block_offset))
|
||||
|
||||
syncers = []
|
||||
|
||||
#if SyncerBackend.first(chain_spec):
|
||||
# backend = SyncerBackend.initial(chain_spec, block_offset)
|
||||
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
|
||||
|
||||
if len(syncer_backends) == 0:
|
||||
logg.info('found no backends to resume')
|
||||
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
|
||||
else:
|
||||
for syncer_backend in syncer_backends:
|
||||
logg.info('resuming sync session {}'.format(syncer_backend))
|
||||
|
||||
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
|
||||
|
||||
for syncer_backend in syncer_backends:
|
||||
try:
|
||||
syncers.append(HistorySyncer(syncer_backend))
|
||||
logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend))
|
||||
except AttributeError:
|
||||
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
|
||||
syncers.append(HeadSyncer(syncer_backend))
|
||||
|
||||
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
|
||||
if trusted_addresses_src == None:
|
||||
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
|
||||
sys.exit(1)
|
||||
trusted_addresses = trusted_addresses_src.split(',')
|
||||
for address in trusted_addresses:
|
||||
logg.info('using trusted address {}'.format(address))
|
||||
CallbackFilter.trusted_addresses = trusted_addresses
|
||||
|
||||
callback_filters = []
|
||||
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
|
||||
task_split = cb.split(':')
|
||||
task_queue = config.get('_CELERY_QUEUE')
|
||||
if len(task_split) > 1:
|
||||
task_queue = task_split[0]
|
||||
callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
|
||||
callback_filters.append(callback_filter)
|
||||
|
||||
tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE'))
|
||||
|
||||
registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE'))
|
||||
|
||||
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
|
||||
|
||||
i = 0
|
||||
for syncer in syncers:
|
||||
logg.debug('running syncer index {}'.format(i))
|
||||
syncer.add_filter(gas_filter)
|
||||
syncer.add_filter(registration_filter)
|
||||
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
|
||||
syncer.add_filter(tx_filter)
|
||||
for cf in callback_filters:
|
||||
syncer.add_filter(cf)
|
||||
|
||||
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn)
|
||||
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
|
||||
|
||||
i += 1
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -18,7 +18,6 @@ import web3
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry.chain import ChainSpec
|
||||
from cic_registry.chain import ChainRegistry
|
||||
from hexathon import add_0x
|
||||
|
||||
# local imports
|
||||
from cic_eth.api import AdminApi
|
||||
@@ -37,8 +36,8 @@ default_abi_dir = '/usr/share/local/cic/solidity/abi'
|
||||
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
|
||||
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
|
||||
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
|
||||
argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')
|
||||
argparser.add_argument('-r', '--registry-address', type=str, help='CIC registry address')
|
||||
argparser.add_argument('-f', '--format', dest='f', default='terminal', type=str, help='Output format')
|
||||
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
|
||||
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
|
||||
@@ -62,16 +61,12 @@ config.process()
|
||||
args_override = {
|
||||
'ETH_PROVIDER': getattr(args, 'p'),
|
||||
'CIC_CHAIN_SPEC': getattr(args, 'i'),
|
||||
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
|
||||
}
|
||||
# override args
|
||||
config.dict_override(args_override, 'cli args')
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
||||
|
||||
config.add(add_0x(args.query), '_QUERY', True)
|
||||
|
||||
re_websocket = re.compile('^wss?://')
|
||||
re_http = re.compile('^https?://')
|
||||
blockchain_provider = config.get('ETH_PROVIDER')
|
||||
@@ -153,20 +148,21 @@ def render_lock(o, **kwargs):
|
||||
|
||||
# TODO: move each command to submodule
|
||||
def main():
|
||||
logg.debug('len {}'.format(len(args.query)))
|
||||
txs = []
|
||||
renderer = render_tx
|
||||
if len(config.get('_QUERY')) > 66:
|
||||
txs = [admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'))]
|
||||
elif len(config.get('_QUERY')) > 42:
|
||||
txs = [admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'))]
|
||||
elif len(config.get('_QUERY')) == 42:
|
||||
txs = admin_api.account(chain_spec, config.get('_QUERY'), include_recipient=False)
|
||||
if len(args.query) > 66:
|
||||
txs = [admin_api.tx(chain_spec, tx_raw=args.query)]
|
||||
elif len(args.query) > 42:
|
||||
txs = [admin_api.tx(chain_spec, tx_hash=args.query)]
|
||||
elif len(args.query) == 42:
|
||||
txs = admin_api.account(chain_spec, args.query, include_recipient=False)
|
||||
renderer = render_account
|
||||
elif len(config.get('_QUERY')) >= 4 and config.get('_QUERY')[:4] == 'lock':
|
||||
elif len(args.query) >= 4 and args.query[:4] == 'lock':
|
||||
txs = admin_api.get_lock()
|
||||
renderer = render_lock
|
||||
else:
|
||||
raise ValueError('cannot parse argument {}'.format(config.get('_QUERY')))
|
||||
raise ValueError('cannot parse argument {}'.format(args.query))
|
||||
|
||||
if len(txs) == 0:
|
||||
logg.info('no matches found')
|
||||
|
||||
@@ -21,7 +21,7 @@ class HistorySyncer(MinedSyncer):
|
||||
:param mx: Maximum number of blocks to return in one call
|
||||
:type mx: int
|
||||
"""
|
||||
def __init__(self, bc_cache, mx=500):
|
||||
def __init__(self, bc_cache, mx=20):
|
||||
super(HistorySyncer, self).__init__(bc_cache)
|
||||
self.max = mx
|
||||
|
||||
|
||||
@@ -23,8 +23,6 @@ class MinedSyncer(Syncer):
|
||||
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
|
||||
"""
|
||||
|
||||
yield_delay = 0.005
|
||||
|
||||
def __init__(self, bc_cache):
|
||||
super(MinedSyncer, self).__init__(bc_cache)
|
||||
self.block_offset = 0
|
||||
@@ -100,10 +98,7 @@ class MinedSyncer(Syncer):
|
||||
logg.debug('got blocks {}'.format(e))
|
||||
for block in e:
|
||||
block_number = self.process(c.w3, block.hex())
|
||||
logg.debug('processed block {} {}'.format(block_number, block.hex()))
|
||||
logg.info('processed block {} {}'.format(block_number, block.hex()))
|
||||
self.bc_cache.disconnect()
|
||||
if len(e) > 0:
|
||||
time.sleep(self.yield_delay)
|
||||
else:
|
||||
time.sleep(interval)
|
||||
time.sleep(interval)
|
||||
logg.info("Syncer no longer set to run, gracefully exiting")
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
# import
|
||||
import requests
|
||||
|
||||
# external imports
|
||||
import celery
|
||||
import sqlalchemy
|
||||
|
||||
|
||||
class CriticalTask(celery.Task):
|
||||
retry_jitter = True
|
||||
retry_backoff = True
|
||||
retry_backoff_max = 8
|
||||
|
||||
|
||||
class CriticalSQLAlchemyTask(CriticalTask):
|
||||
autoretry_for = (
|
||||
sqlalchemy.exc.DatabaseError,
|
||||
sqlalchemy.exc.TimeoutError,
|
||||
)
|
||||
|
||||
|
||||
class CriticalWeb3Task(CriticalTask):
|
||||
autoretry_for = (
|
||||
requests.exceptions.ConnectionError,
|
||||
)
|
||||
|
||||
|
||||
class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
|
||||
autoretry_for = (
|
||||
sqlalchemy.exc.DatabaseError,
|
||||
sqlalchemy.exc.TimeoutError,
|
||||
requests.exceptions.ConnectionError,
|
||||
)
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user