Compare commits
41 Commits
bvander/ci
...
lash/fix-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db36ba997d
|
||
|
|
ae1502a651 | ||
|
|
5001113267 | ||
| 451079d004 | |||
| ba8a0b1953 | |||
| bbc948757f | |||
| ca8c1b1f27 | |||
| 854753f120 | |||
| daadbc27e9 | |||
| f37fa1dbcf | |||
|
|
ac264314c0 | ||
|
|
84c1d11b48 | ||
|
|
6fe87652ce | ||
|
|
8f65be16b1 | ||
|
|
34a48a6c6c | ||
|
|
c68d9d8404 | ||
| dd8d4b01e2 | |||
| 4d3cc85573 | |||
| 10717df7d1 | |||
| 3f2e0f5b2e | |||
|
|
42ae8e5ed3 | ||
|
|
96b4ad4a72 | ||
| 1274958493 | |||
|
|
b38ff7629d | ||
|
|
fe499de1e4 | ||
| 2657ed58d3 | |||
| b26a14e8ca | |||
|
|
725ef54cf5 | ||
|
|
3ef39fb393 | ||
|
|
6c1382aac6 | ||
|
|
ab7b5fbeb9 | ||
|
|
c6bcda8832 | ||
|
|
be3c59a780 | ||
|
|
fcde3d0bb2 | ||
|
|
500f0c3a41 | ||
|
|
fa83c50ab5 | ||
|
|
f154136dd3 | ||
|
|
d493cebc7c | ||
|
|
d798f78d7f | ||
|
|
d872e78e39 | ||
|
|
14f29c4c32 |
@@ -5,6 +5,7 @@ include:
|
||||
- local: 'apps/cic-ussd/.gitlab-ci.yml'
|
||||
- local: 'apps/cic-notify/.gitlab-ci.yml'
|
||||
- local: 'apps/cic-meta/.gitlab-ci.yml'
|
||||
- local: 'apps/cic-cache/.gitlab-ci.yml'
|
||||
|
||||
stages:
|
||||
- build
|
||||
|
||||
3
.gitmodules
vendored
3
.gitmodules
vendored
@@ -1,3 +0,0 @@
|
||||
[submodule "apps/cic-cache"]
|
||||
path = apps/cic-cache
|
||||
url = git@gitlab.com:grassrootseconomics/cic-cache.git
|
||||
|
||||
Submodule apps/cic-cache deleted from 06c5f0fb0d
2
apps/cic-cache/.config/bancor.ini
Normal file
2
apps/cic-cache/.config/bancor.ini
Normal file
@@ -0,0 +1,2 @@
|
||||
[bancor]
|
||||
dir =
|
||||
2
apps/cic-cache/.config/cic.ini
Normal file
2
apps/cic-cache/.config/cic.ini
Normal file
@@ -0,0 +1,2 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
8
apps/cic-cache/.config/database.ini
Normal file
8
apps/cic-cache/.config/database.ini
Normal file
@@ -0,0 +1,8 @@
|
||||
[database]
|
||||
NAME=cic-eth
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=postgresql
|
||||
DRIVER=psycopg2
|
||||
6
apps/cic-cache/.config/eth.ini
Normal file
6
apps/cic-cache/.config/eth.ini
Normal file
@@ -0,0 +1,6 @@
|
||||
[eth]
|
||||
provider = ws://localhost:8545
|
||||
#ttp_provider = http://localhost:8545
|
||||
#provider = http://localhost:8545
|
||||
gas_provider_address =
|
||||
#chain_id =
|
||||
2
apps/cic-cache/.config/test/bancor.ini
Normal file
2
apps/cic-cache/.config/test/bancor.ini
Normal file
@@ -0,0 +1,2 @@
|
||||
[bancor]
|
||||
dir =
|
||||
2
apps/cic-cache/.config/test/cic.ini
Normal file
2
apps/cic-cache/.config/test/cic.ini
Normal file
@@ -0,0 +1,2 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
8
apps/cic-cache/.config/test/database.ini
Normal file
8
apps/cic-cache/.config/test/database.ini
Normal file
@@ -0,0 +1,8 @@
|
||||
[database]
|
||||
NAME=cic-cache-test
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=sqlite
|
||||
DRIVER=pysqlite
|
||||
5
apps/cic-cache/.config/test/eth.ini
Normal file
5
apps/cic-cache/.config/test/eth.ini
Normal file
@@ -0,0 +1,5 @@
|
||||
[eth]
|
||||
#ws_provider = ws://localhost:8546
|
||||
#ttp_provider = http://localhost:8545
|
||||
provider = http://localhost:8545
|
||||
#chain_id =
|
||||
5
apps/cic-cache/.coveragerc
Normal file
5
apps/cic-cache/.coveragerc
Normal file
@@ -0,0 +1,5 @@
|
||||
[report]
|
||||
omit =
|
||||
.venv/*
|
||||
scripts/*
|
||||
cic_cache/db/postgres/*
|
||||
7
apps/cic-cache/.envrc_example
Normal file
7
apps/cic-cache/.envrc_example
Normal file
@@ -0,0 +1,7 @@
|
||||
set -a
|
||||
CICTEST_DATABASE_ENGINE=postgresql
|
||||
CICTEST_DATABASE_DRIVER=psycopg2
|
||||
CICTEST_DATABASE_HOST=localhost
|
||||
CICTEST_DATABASE_PORT=5432
|
||||
CICTEST_DATABASE_NAME=cic-eth-test
|
||||
set +a
|
||||
8
apps/cic-cache/.gitignore
vendored
Normal file
8
apps/cic-cache/.gitignore
vendored
Normal file
@@ -0,0 +1,8 @@
|
||||
.envrc
|
||||
.envrc_dev
|
||||
.venv
|
||||
__pycache__
|
||||
*.pyc
|
||||
_build
|
||||
doc/**/*.png
|
||||
doc/**/html
|
||||
22
apps/cic-cache/.gitlab-ci.yml
Normal file
22
apps/cic-cache/.gitlab-ci.yml
Normal file
@@ -0,0 +1,22 @@
|
||||
.cic_cache_variables:
|
||||
variables:
|
||||
APP_NAME: cic-cache
|
||||
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
|
||||
|
||||
.cic_cache_changes_target:
|
||||
rules:
|
||||
- changes:
|
||||
- $CONTEXT/$APP_NAME/*
|
||||
|
||||
build-mr-cic-cache:
|
||||
extends:
|
||||
- .cic_cache_changes_target
|
||||
- .py_build_merge_request
|
||||
- .cic_cache_variables
|
||||
|
||||
build-push-cic-cache:
|
||||
extends:
|
||||
- .py_build_push
|
||||
- .cic_cache_variables
|
||||
|
||||
|
||||
13
apps/cic-cache/CHANGELOG
Normal file
13
apps/cic-cache/CHANGELOG
Normal file
@@ -0,0 +1,13 @@
|
||||
- 0.1.2
|
||||
* Revert to alembic migrations
|
||||
- 0.1.1
|
||||
* Add missing modules to setup
|
||||
- 0.1.0
|
||||
* Remove old APIs
|
||||
* Add bloom filter output APIs for all txs and per-account txs
|
||||
- 0.0.2
|
||||
* UWSGI server endpoint example
|
||||
* OpenAPI spec
|
||||
* stored procedures, test fixture for database schema
|
||||
- 0.0.1
|
||||
* Add json translators of transaction_list and balances stored procedure queries
|
||||
0
apps/cic-cache/README.md
Normal file
0
apps/cic-cache/README.md
Normal file
1
apps/cic-cache/cic_cache/__init__.py
Normal file
1
apps/cic-cache/cic_cache/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .cache import BloomCache
|
||||
73
apps/cic-cache/cic_cache/api.py
Normal file
73
apps/cic-cache/cic_cache/api.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""API for cic-cache celery tasks
|
||||
|
||||
.. moduleauthor:: Louis Holbrook <dev@holbrook.no>
|
||||
|
||||
"""
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
|
||||
|
||||
app = celery.current_app
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Api:
|
||||
"""Creates task chains to perform well-known CIC operations.
|
||||
|
||||
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
|
||||
|
||||
:param callback_param: Static value to pass to callback
|
||||
:type callback_param: str
|
||||
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
|
||||
:type callback_task: string
|
||||
:param queue: Name of worker queue to submit tasks to
|
||||
:type queue: str
|
||||
"""
|
||||
def __init__(self, queue='cic-cache', callback_param=None, callback_task='cic_cache.callbacks.noop.noop', callback_queue=None):
|
||||
self.callback_param = callback_param
|
||||
self.callback_task = callback_task
|
||||
self.queue = queue
|
||||
logg.info('api using queue {}'.format(self.queue))
|
||||
self.callback_success = None
|
||||
self.callback_error = None
|
||||
if callback_queue == None:
|
||||
callback_queue=self.queue
|
||||
|
||||
if callback_param != None:
|
||||
self.callback_success = celery.signature(
|
||||
callback_task,
|
||||
[
|
||||
callback_param,
|
||||
0,
|
||||
],
|
||||
queue=callback_queue,
|
||||
)
|
||||
self.callback_error = celery.signature(
|
||||
callback_task,
|
||||
[
|
||||
callback_param,
|
||||
1,
|
||||
],
|
||||
queue=callback_queue,
|
||||
)
|
||||
|
||||
def list(self, offset, limit, address=None):
|
||||
s = celery.signature(
|
||||
'cic_cache.tasks.tx.tx_filter',
|
||||
[
|
||||
0,
|
||||
100,
|
||||
address,
|
||||
],
|
||||
queue=None
|
||||
)
|
||||
if self.callback_param != None:
|
||||
s.link(self.callback_success).on_error(self.callback_error)
|
||||
|
||||
t = s.apply_async()
|
||||
|
||||
return t
|
||||
89
apps/cic-cache/cic_cache/cache.py
Normal file
89
apps/cic-cache/cic_cache/cache.py
Normal file
@@ -0,0 +1,89 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import moolb
|
||||
|
||||
# local imports
|
||||
from cic_cache.db import list_transactions_mined
|
||||
from cic_cache.db import list_transactions_account_mined
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class BloomCache:
|
||||
|
||||
def __init__(self, session):
|
||||
self.session = session
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get_filter_size(n):
|
||||
n = 8192 * 8
|
||||
logg.warning('filter size hardcoded to {}'.format(n))
|
||||
return n
|
||||
|
||||
|
||||
def load_transactions(self, offset, limit):
|
||||
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
|
||||
|
||||
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
|
||||
|
||||
For example, if the block number is 13 and the transaction index is 42, the input are:
|
||||
|
||||
block filter: 0x0d000000
|
||||
block+tx filter: 0x0d0000002a0000000
|
||||
|
||||
:param offset: Offset in data set to return transactions from
|
||||
:type offset: int
|
||||
:param limit: Max number of transactions to retrieve
|
||||
:type limit: int
|
||||
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
|
||||
:rtype: tuple
|
||||
"""
|
||||
rows = list_transactions_mined(self.session, offset, limit)
|
||||
|
||||
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
|
||||
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
|
||||
highest_block = -1
|
||||
lowest_block = -1
|
||||
for r in rows:
|
||||
if highest_block == -1:
|
||||
highest_block = r[0]
|
||||
lowest_block = r[0]
|
||||
block = r[0].to_bytes(4, byteorder='big')
|
||||
tx = r[1].to_bytes(4, byteorder='big')
|
||||
f_block.add(block)
|
||||
f_blocktx.add(block + tx)
|
||||
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
|
||||
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
|
||||
|
||||
|
||||
def load_transactions_account(self, address, offset, limit):
|
||||
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
|
||||
|
||||
:param address: Address to retrieve transactions for.
|
||||
:type address: str, 0x-hex
|
||||
:param offset: Offset in data set to return transactions from
|
||||
:type offset: int
|
||||
:param limit: Max number of transactions to retrieve
|
||||
:type limit: int
|
||||
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
|
||||
:rtype: tuple
|
||||
"""
|
||||
rows = list_transactions_account_mined(self.session, address, offset, limit)
|
||||
|
||||
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
|
||||
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
|
||||
highest_block = -1;
|
||||
lowest_block = -1;
|
||||
for r in rows:
|
||||
if highest_block == -1:
|
||||
highest_block = r[0]
|
||||
lowest_block = r[0]
|
||||
block = r[0].to_bytes(4, byteorder='big')
|
||||
tx = r[1].to_bytes(4, byteorder='big')
|
||||
f_block.add(block)
|
||||
f_blocktx.add(block + tx)
|
||||
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
|
||||
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
|
||||
35
apps/cic-cache/cic_cache/db/__init__.py
Normal file
35
apps/cic-cache/cic_cache/db/__init__.py
Normal file
@@ -0,0 +1,35 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from .list import list_transactions_mined
|
||||
from .list import list_transactions_account_mined
|
||||
from .list import add_transaction
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def dsn_from_config(config):
|
||||
scheme = config.get('DATABASE_ENGINE')
|
||||
if config.get('DATABASE_DRIVER') != None:
|
||||
scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
|
||||
|
||||
dsn = ''
|
||||
if config.get('DATABASE_ENGINE') == 'sqlite':
|
||||
dsn = '{}:///{}'.format(
|
||||
scheme,
|
||||
config.get('DATABASE_NAME'),
|
||||
)
|
||||
|
||||
else:
|
||||
dsn = '{}://{}:{}@{}:{}/{}'.format(
|
||||
scheme,
|
||||
config.get('DATABASE_USER'),
|
||||
config.get('DATABASE_PASSWORD'),
|
||||
config.get('DATABASE_HOST'),
|
||||
config.get('DATABASE_PORT'),
|
||||
config.get('DATABASE_NAME'),
|
||||
)
|
||||
logg.debug('parsed dsn from config: {}'.format(dsn))
|
||||
return dsn
|
||||
|
||||
79
apps/cic-cache/cic_cache/db/list.py
Normal file
79
apps/cic-cache/cic_cache/db/list.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
# third-party imports
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def list_transactions_mined(
|
||||
session,
|
||||
offset,
|
||||
limit,
|
||||
):
|
||||
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
|
||||
|
||||
:param offset: Offset in data set to return transactions from
|
||||
:type offset: int
|
||||
:param limit: Max number of transactions to retrieve
|
||||
:type limit: int
|
||||
:result: Result set
|
||||
:rtype: SQLAlchemy.ResultProxy
|
||||
"""
|
||||
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
|
||||
r = session.execute(s)
|
||||
return r
|
||||
|
||||
|
||||
def list_transactions_account_mined(
|
||||
session,
|
||||
address,
|
||||
offset,
|
||||
limit,
|
||||
):
|
||||
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
|
||||
|
||||
:param address: Address to retrieve transactions for.
|
||||
:type address: str, 0x-hex
|
||||
:param offset: Offset in data set to return transactions from
|
||||
:type offset: int
|
||||
:param limit: Max number of transactions to retrieve
|
||||
:type limit: int
|
||||
:result: Result set
|
||||
:rtype: SQLAlchemy.ResultProxy
|
||||
"""
|
||||
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
|
||||
r = session.execute(s)
|
||||
return r
|
||||
|
||||
|
||||
def add_transaction(
|
||||
session, tx_hash,
|
||||
block_number,
|
||||
tx_index,
|
||||
sender,
|
||||
receiver,
|
||||
source_token,
|
||||
destination_token,
|
||||
from_value,
|
||||
to_value,
|
||||
success,
|
||||
timestamp,
|
||||
):
|
||||
date_block = datetime.datetime.fromtimestamp(timestamp)
|
||||
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
|
||||
tx_hash,
|
||||
block_number,
|
||||
tx_index,
|
||||
sender,
|
||||
receiver,
|
||||
source_token,
|
||||
destination_token,
|
||||
from_value,
|
||||
to_value,
|
||||
success,
|
||||
date_block,
|
||||
)
|
||||
session.execute(s)
|
||||
1
apps/cic-cache/cic_cache/db/migrations/default/README
Normal file
1
apps/cic-cache/cic_cache/db/migrations/default/README
Normal file
@@ -0,0 +1 @@
|
||||
Generic single-database configuration.
|
||||
86
apps/cic-cache/cic_cache/db/migrations/default/alembic.ini
Normal file
86
apps/cic-cache/cic_cache/db/migrations/default/alembic.ini
Normal file
@@ -0,0 +1,86 @@
|
||||
# A generic, single database configuration.
|
||||
|
||||
[alembic]
|
||||
# path to migration scripts
|
||||
script_location = .
|
||||
|
||||
# template used to generate migration files
|
||||
# file_template = %%(rev)s_%%(slug)s
|
||||
|
||||
# timezone to use when rendering the date
|
||||
# within the migration file as well as the filename.
|
||||
# string value is passed to dateutil.tz.gettz()
|
||||
# leave blank for localtime
|
||||
# timezone =
|
||||
|
||||
# max length of characters to apply to the
|
||||
# "slug" field
|
||||
# truncate_slug_length = 40
|
||||
|
||||
# set to 'true' to run the environment during
|
||||
# the 'revision' command, regardless of autogenerate
|
||||
# revision_environment = false
|
||||
|
||||
# set to 'true' to allow .pyc and .pyo files without
|
||||
# a source .py file to be detected as revisions in the
|
||||
# versions/ directory
|
||||
# sourceless = false
|
||||
|
||||
# version location specification; this defaults
|
||||
# to ./versions. When using multiple version
|
||||
# directories, initial revisions must be specified with --version-path
|
||||
# version_locations = %(here)s/bar %(here)s/bat ./versions
|
||||
|
||||
# the output encoding used when revision files
|
||||
# are written from script.py.mako
|
||||
# output_encoding = utf-8
|
||||
|
||||
#sqlalchemy.url = driver://user:pass@localhost/dbname
|
||||
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-cache
|
||||
|
||||
|
||||
[post_write_hooks]
|
||||
# post_write_hooks defines scripts or Python functions that are run
|
||||
# on newly generated revision scripts. See the documentation for further
|
||||
# detail and examples
|
||||
|
||||
# format using "black" - use the console_scripts runner, against the "black" entrypoint
|
||||
# hooks=black
|
||||
# black.type=console_scripts
|
||||
# black.entrypoint=black
|
||||
# black.options=-l 79
|
||||
|
||||
# Logging configuration
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
level = WARN
|
||||
handlers = console
|
||||
qualname =
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = WARN
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
|
||||
[logger_alembic]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = alembic
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
datefmt = %H:%M:%S
|
||||
77
apps/cic-cache/cic_cache/db/migrations/default/env.py
Normal file
77
apps/cic-cache/cic_cache/db/migrations/default/env.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from logging.config import fileConfig
|
||||
|
||||
from sqlalchemy import engine_from_config
|
||||
from sqlalchemy import pool
|
||||
|
||||
from alembic import context
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
# access to the values within the .ini file in use.
|
||||
config = context.config
|
||||
|
||||
# Interpret the config file for Python logging.
|
||||
# This line sets up loggers basically.
|
||||
fileConfig(config.config_file_name)
|
||||
|
||||
# add your model's MetaData object here
|
||||
# for 'autogenerate' support
|
||||
# from myapp import mymodel
|
||||
# target_metadata = mymodel.Base.metadata
|
||||
target_metadata = None
|
||||
|
||||
# other values from the config, defined by the needs of env.py,
|
||||
# can be acquired:
|
||||
# my_important_option = config.get_main_option("my_important_option")
|
||||
# ... etc.
|
||||
|
||||
|
||||
def run_migrations_offline():
|
||||
"""Run migrations in 'offline' mode.
|
||||
|
||||
This configures the context with just a URL
|
||||
and not an Engine, though an Engine is acceptable
|
||||
here as well. By skipping the Engine creation
|
||||
we don't even need a DBAPI to be available.
|
||||
|
||||
Calls to context.execute() here emit the given string to the
|
||||
script output.
|
||||
|
||||
"""
|
||||
url = config.get_main_option("sqlalchemy.url")
|
||||
context.configure(
|
||||
url=url,
|
||||
target_metadata=target_metadata,
|
||||
literal_binds=True,
|
||||
dialect_opts={"paramstyle": "named"},
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
def run_migrations_online():
|
||||
"""Run migrations in 'online' mode.
|
||||
|
||||
In this scenario we need to create an Engine
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
connectable = engine_from_config(
|
||||
config.get_section(config.config_ini_section),
|
||||
prefix="sqlalchemy.",
|
||||
poolclass=pool.NullPool,
|
||||
)
|
||||
|
||||
with connectable.connect() as connection:
|
||||
context.configure(
|
||||
connection=connection, target_metadata=target_metadata
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
||||
@@ -0,0 +1,24 @@
|
||||
"""${message}
|
||||
|
||||
Revision ID: ${up_revision}
|
||||
Revises: ${down_revision | comma,n}
|
||||
Create Date: ${create_date}
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
${imports if imports else ""}
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = ${repr(up_revision)}
|
||||
down_revision = ${repr(down_revision)}
|
||||
branch_labels = ${repr(branch_labels)}
|
||||
depends_on = ${repr(depends_on)}
|
||||
|
||||
|
||||
def upgrade():
|
||||
${upgrades if upgrades else "pass"}
|
||||
|
||||
|
||||
def downgrade():
|
||||
${downgrades if downgrades else "pass"}
|
||||
@@ -0,0 +1,52 @@
|
||||
"""Base tables
|
||||
|
||||
Revision ID: 63b629f14a85
|
||||
Revises:
|
||||
Create Date: 2020-12-04 08:16:00.412189
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '63b629f14a85'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'tx',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('date_registered', sa.DateTime, nullable=False, server_default=sa.func.current_timestamp()),
|
||||
sa.Column('block_number', sa.Integer, nullable=False),
|
||||
sa.Column('tx_index', sa.Integer, nullable=False),
|
||||
sa.Column('tx_hash', sa.String(66), nullable=False),
|
||||
sa.Column('sender', sa.String(42), nullable=False),
|
||||
sa.Column('recipient', sa.String(42), nullable=False),
|
||||
sa.Column('source_token', sa.String(42), nullable=False),
|
||||
sa.Column('destination_token', sa.String(42), nullable=False),
|
||||
sa.Column('success', sa.Boolean, nullable=False),
|
||||
sa.Column('from_value', sa.BIGINT(), nullable=False),
|
||||
sa.Column('to_value', sa.BIGINT(), nullable=False),
|
||||
sa.Column('date_block', sa.DateTime, nullable=False),
|
||||
)
|
||||
op.create_table(
|
||||
'tx_sync',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('tx', sa.String(66), nullable=False),
|
||||
)
|
||||
|
||||
op.execute("INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');")
|
||||
|
||||
op.create_index('sender_token_idx', 'tx', ['sender', 'source_token'])
|
||||
op.create_index('recipient_token_idx', 'tx', ['recipient', 'destination_token'])
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_index('recipient_token_idx')
|
||||
op.drop_index('sender_token_idx')
|
||||
op.drop_table('tx_sync')
|
||||
op.drop_table('tx')
|
||||
0
apps/cic-cache/cic_cache/db/models/__init__.py
Normal file
0
apps/cic-cache/cic_cache/db/models/__init__.py
Normal file
102
apps/cic-cache/cic_cache/db/models/base.py
Normal file
102
apps/cic-cache/cic_cache/db/models/base.py
Normal file
@@ -0,0 +1,102 @@
|
||||
# stanard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, Integer
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
Model = declarative_base(name='Model')
|
||||
|
||||
|
||||
class SessionBase(Model):
|
||||
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
|
||||
"""
|
||||
__abstract__ = True
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
engine = None
|
||||
"""Database connection engine of the running aplication"""
|
||||
sessionmaker = None
|
||||
"""Factory object responsible for creating sessions from the connection pool"""
|
||||
transactional = True
|
||||
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
|
||||
poolable = True
|
||||
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
|
||||
procedural = True
|
||||
"""Whether the database backend supports stored procedures"""
|
||||
localsessions = {}
|
||||
"""Contains dictionary of sessions initiated by db model components"""
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create_session():
|
||||
"""Creates a new database session.
|
||||
"""
|
||||
return SessionBase.sessionmaker()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _set_engine(engine):
|
||||
"""Sets the database engine static property
|
||||
"""
|
||||
SessionBase.engine = engine
|
||||
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def connect(dsn, debug=False):
|
||||
"""Create new database connection engine and connect to database backend.
|
||||
|
||||
:param dsn: DSN string defining connection.
|
||||
:type dsn: str
|
||||
"""
|
||||
e = None
|
||||
if SessionBase.poolable:
|
||||
e = create_engine(
|
||||
dsn,
|
||||
max_overflow=50,
|
||||
pool_pre_ping=True,
|
||||
pool_size=20,
|
||||
pool_recycle=10,
|
||||
echo=debug,
|
||||
)
|
||||
else:
|
||||
e = create_engine(
|
||||
dsn,
|
||||
echo=debug,
|
||||
)
|
||||
|
||||
SessionBase._set_engine(e)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def disconnect():
|
||||
"""Disconnect from database and free resources.
|
||||
"""
|
||||
SessionBase.engine.dispose()
|
||||
SessionBase.engine = None
|
||||
|
||||
|
||||
@staticmethod
|
||||
def bind_session(session=None):
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
localsession_key = str(id(localsession))
|
||||
logg.debug('creating new session {}'.format(localsession_key))
|
||||
SessionBase.localsessions[localsession_key] = localsession
|
||||
return localsession
|
||||
|
||||
|
||||
@staticmethod
|
||||
def release_session(session=None):
|
||||
session_key = str(id(session))
|
||||
if SessionBase.localsessions.get(session_key) != None:
|
||||
logg.debug('destroying session {}'.format(session_key))
|
||||
session.commit()
|
||||
session.close()
|
||||
141
apps/cic-cache/cic_cache/runnable/server.py
Normal file
141
apps/cic-cache/cic_cache/runnable/server.py
Normal file
@@ -0,0 +1,141 @@
|
||||
# standard imports
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
import argparse
|
||||
import json
|
||||
import base64
|
||||
|
||||
# third-party imports
|
||||
import confini
|
||||
|
||||
# local imports
|
||||
from cic_cache import BloomCache
|
||||
from cic_cache.db import dsn_from_config
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
|
||||
migrationsdir = os.path.join(dbdir, 'migrations')
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-cache')
|
||||
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('-v', action='store_true', help='be verbose')
|
||||
argparser.add_argument('-vv', action='store_true', help='be more verbose')
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.vv:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
elif args.v:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
config = confini.Config(args.c, args.env_prefix)
|
||||
config.process()
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config:\n{}'.format(config))
|
||||
|
||||
dsn = dsn_from_config(config)
|
||||
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
|
||||
|
||||
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
|
||||
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
|
||||
|
||||
DEFAULT_LIMIT = 100
|
||||
|
||||
|
||||
def process_transactions_account_bloom(session, env):
|
||||
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
|
||||
if not r:
|
||||
return None
|
||||
|
||||
address = r[1]
|
||||
if r[2] == None:
|
||||
address = '0x' + address
|
||||
offset = DEFAULT_LIMIT
|
||||
if r.lastindex > 2:
|
||||
offset = r[3]
|
||||
limit = 0
|
||||
if r.lastindex > 3:
|
||||
limit = r[4]
|
||||
|
||||
c = BloomCache(session)
|
||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
|
||||
|
||||
o = {
|
||||
'alg': 'sha256',
|
||||
'low': lowest_block,
|
||||
'high': highest_block,
|
||||
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
|
||||
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
|
||||
'filter_rounds': 3,
|
||||
}
|
||||
|
||||
j = json.dumps(o)
|
||||
|
||||
return ('application/json', j.encode('utf-8'),)
|
||||
|
||||
|
||||
def process_transactions_all_bloom(session, env):
|
||||
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
|
||||
if not r:
|
||||
return None
|
||||
|
||||
offset = DEFAULT_LIMIT
|
||||
if r.lastindex > 0:
|
||||
offset = r[1]
|
||||
limit = 0
|
||||
if r.lastindex > 1:
|
||||
limit = r[2]
|
||||
|
||||
c = BloomCache(session)
|
||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
|
||||
|
||||
o = {
|
||||
'alg': 'sha256',
|
||||
'low': lowest_block,
|
||||
'high': highest_block,
|
||||
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
|
||||
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
|
||||
'filter_rounds': 3,
|
||||
}
|
||||
|
||||
j = json.dumps(o)
|
||||
|
||||
return ('application/json', j.encode('utf-8'),)
|
||||
|
||||
|
||||
# uwsgi application
|
||||
def application(env, start_response):
|
||||
|
||||
headers = []
|
||||
content = b''
|
||||
|
||||
session = SessionBase.create_session()
|
||||
for handler in [
|
||||
process_transactions_all_bloom,
|
||||
process_transactions_account_bloom,
|
||||
]:
|
||||
r = handler(session, env)
|
||||
if r != None:
|
||||
(mime_type, content) = r
|
||||
break
|
||||
session.close()
|
||||
|
||||
headers.append(('Content-Length', str(len(content))),)
|
||||
headers.append(('Access-Control-Allow-Origin', '*',));
|
||||
|
||||
if len(content) == 0:
|
||||
headers.append(('Content-Type', 'text/plain, charset=UTF-8',))
|
||||
start_response('404 Looked everywhere, sorry', headers)
|
||||
else:
|
||||
headers.append(('Content-Type', mime_type,))
|
||||
start_response('200 OK', headers)
|
||||
|
||||
return [content]
|
||||
98
apps/cic-cache/cic_cache/runnable/tasker.py
Normal file
98
apps/cic-cache/cic_cache/runnable/tasker.py
Normal file
@@ -0,0 +1,98 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
import confini
|
||||
|
||||
# local imports
|
||||
from cic_cache.db import dsn_from_config
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
from cic_cache.tasks.tx import *
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-cache')
|
||||
|
||||
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
|
||||
argparser.add_argument('-q', type=str, default='cic-cache', help='queue name for worker tasks')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('-v', action='store_true', help='be verbose')
|
||||
argparser.add_argument('-vv', action='store_true', help='be more verbose')
|
||||
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.vv:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
elif args.v:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
config = confini.Config(args.c, args.env_prefix)
|
||||
config.process()
|
||||
|
||||
# connect to database
|
||||
dsn = dsn_from_config(config)
|
||||
SessionBase.connect(dsn)
|
||||
|
||||
# verify database connection with minimal sanity query
|
||||
#session = SessionBase.create_session()
|
||||
#session.execute('select version_num from alembic_version')
|
||||
#session.close()
|
||||
|
||||
# set up celery
|
||||
current_app = celery.Celery(__name__)
|
||||
|
||||
broker = config.get('CELERY_BROKER_URL')
|
||||
if broker[:4] == 'file':
|
||||
bq = tempfile.mkdtemp()
|
||||
bp = tempfile.mkdtemp()
|
||||
current_app.conf.update({
|
||||
'broker_url': broker,
|
||||
'broker_transport_options': {
|
||||
'data_folder_in': bq,
|
||||
'data_folder_out': bq,
|
||||
'data_folder_processed': bp,
|
||||
},
|
||||
},
|
||||
)
|
||||
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
|
||||
else:
|
||||
current_app.conf.update({
|
||||
'broker_url': broker,
|
||||
})
|
||||
|
||||
result = config.get('CELERY_RESULT_URL')
|
||||
if result[:4] == 'file':
|
||||
rq = tempfile.mkdtemp()
|
||||
current_app.conf.update({
|
||||
'result_backend': 'file://{}'.format(rq),
|
||||
})
|
||||
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
|
||||
else:
|
||||
current_app.conf.update({
|
||||
'result_backend': result,
|
||||
})
|
||||
|
||||
|
||||
def main():
|
||||
argv = ['worker']
|
||||
if args.vv:
|
||||
argv.append('--loglevel=DEBUG')
|
||||
elif args.v:
|
||||
argv.append('--loglevel=INFO')
|
||||
argv.append('-Q')
|
||||
argv.append(args.q)
|
||||
argv.append('-n')
|
||||
argv.append(args.q)
|
||||
|
||||
current_app.worker_main(argv)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
339
apps/cic-cache/cic_cache/runnable/tracker.py
Normal file
339
apps/cic-cache/cic_cache/runnable/tracker.py
Normal file
@@ -0,0 +1,339 @@
|
||||
# standard imports
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
import logging
|
||||
import time
|
||||
import enum
|
||||
import re
|
||||
|
||||
# third-party imports
|
||||
import confini
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry.chain import (
|
||||
ChainRegistry,
|
||||
ChainSpec,
|
||||
)
|
||||
#from cic_registry.bancor import BancorRegistryClient
|
||||
from cic_registry.token import Token
|
||||
from cic_registry.error import (
|
||||
UnknownContractError,
|
||||
UnknownDeclarationError,
|
||||
)
|
||||
from cic_registry.declaration import to_token_declaration
|
||||
from web3.exceptions import BlockNotFound, TransactionNotFound
|
||||
from websockets.exceptions import ConnectionClosedError
|
||||
from requests.exceptions import ConnectionError
|
||||
import web3
|
||||
from web3 import HTTPProvider, WebsocketProvider
|
||||
|
||||
# local imports
|
||||
from cic_cache import db
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
|
||||
|
||||
log_topics = {
|
||||
'transfer': '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
|
||||
'convert': '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095',
|
||||
'accountregistry_add': '0a3b0a4f4c6e53dce3dbcad5614cb2ba3a0fa7326d03c5d64b4fa2d565492737',
|
||||
}
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-cache')
|
||||
|
||||
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
|
||||
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
|
||||
argparser.add_argument('--trust-address', default=[], type=str, dest='trust_address', action='append', help='Set address as trust')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
|
||||
argparser.add_argument('-v', help='be verbose', action='store_true')
|
||||
argparser.add_argument('-vv', help='be more verbose', action='store_true')
|
||||
args = argparser.parse_args(sys.argv[1:])
|
||||
|
||||
config_dir = os.path.join(args.c)
|
||||
os.makedirs(config_dir, 0o777, True)
|
||||
|
||||
|
||||
if args.v == True:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
elif args.vv == True:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
config = confini.Config(config_dir, args.env_prefix)
|
||||
config.process()
|
||||
args_override = {
|
||||
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
|
||||
'CIC_TRUST_ADDRESS': ",".join(getattr(args, 'trust_address', [])),
|
||||
}
|
||||
config.dict_override(args_override, 'cli flag')
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
||||
|
||||
# connect to database
|
||||
dsn = db.dsn_from_config(config)
|
||||
SessionBase.connect(dsn)
|
||||
|
||||
|
||||
re_websocket = re.compile('^wss?://')
|
||||
re_http = re.compile('^https?://')
|
||||
blockchain_provider = config.get('ETH_PROVIDER')
|
||||
if re.match(re_websocket, blockchain_provider) != None:
|
||||
blockchain_provider = WebsocketProvider(blockchain_provider)
|
||||
elif re.match(re_http, blockchain_provider) != None:
|
||||
blockchain_provider = HTTPProvider(blockchain_provider)
|
||||
else:
|
||||
raise ValueError('unknown provider url {}'.format(blockchain_provider))
|
||||
|
||||
def web3_constructor():
|
||||
w3 = web3.Web3(blockchain_provider)
|
||||
return (blockchain_provider, w3)
|
||||
|
||||
|
||||
class RunStateEnum(enum.IntEnum):
|
||||
INIT = 0
|
||||
RUN = 1
|
||||
TERMINATE = 9
|
||||
|
||||
|
||||
def rubberstamp(src):
|
||||
return True
|
||||
|
||||
|
||||
class Tracker:
|
||||
|
||||
def __init__(self, chain_spec, trusts=[]):
|
||||
self.block_height = 0
|
||||
self.tx_height = 0
|
||||
self.state = RunStateEnum.INIT
|
||||
self.declarator_cache = {}
|
||||
self.convert_enabled = False
|
||||
self.trusts = trusts
|
||||
self.chain_spec = chain_spec
|
||||
self.declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
|
||||
|
||||
|
||||
def __process_tx(self, w3, session, t, r, l, b):
|
||||
token_value = int(l.data, 16)
|
||||
token_sender = l.topics[1][-20:].hex()
|
||||
token_recipient = l.topics[2][-20:].hex()
|
||||
|
||||
#ts = ContractRegistry.get_address(t.address)
|
||||
ts = CICRegistry.get_address(self.chain_spec, t.address())
|
||||
logg.info('add token transfer {} value {} from {} to {}'.format(
|
||||
ts.symbol(),
|
||||
token_value,
|
||||
token_sender,
|
||||
token_recipient,
|
||||
)
|
||||
)
|
||||
|
||||
db.add_transaction(
|
||||
session,
|
||||
r.transactionHash.hex(),
|
||||
r.blockNumber,
|
||||
r.transactionIndex,
|
||||
w3.toChecksumAddress(token_sender),
|
||||
w3.toChecksumAddress(token_recipient),
|
||||
t.address(),
|
||||
t.address(),
|
||||
token_value,
|
||||
token_value,
|
||||
r.status == 1,
|
||||
b.timestamp,
|
||||
)
|
||||
session.flush()
|
||||
|
||||
|
||||
# TODO: simplify/ split up and/or comment, function is too long
|
||||
def __process_convert(self, w3, session, t, r, l, b):
|
||||
logg.warning('conversions are deactivated')
|
||||
return
|
||||
# token_source = l.topics[2][-20:].hex()
|
||||
# token_source = w3.toChecksumAddress(token_source)
|
||||
# token_destination = l.topics[3][-20:].hex()
|
||||
# token_destination = w3.toChecksumAddress(token_destination)
|
||||
# data_noox = l.data[2:]
|
||||
# d = data_noox[:64]
|
||||
# token_from_value = int(d, 16)
|
||||
# d = data_noox[64:128]
|
||||
# token_to_value = int(d, 16)
|
||||
# token_trader = '0x' + data_noox[192-40:]
|
||||
#
|
||||
# #ts = ContractRegistry.get_address(token_source)
|
||||
# ts = CICRegistry.get_address(CICRegistry.bancor_chain_spec, t.address())
|
||||
# #if ts == None:
|
||||
# # ts = ContractRegistry.reserves[token_source]
|
||||
# td = ContractRegistry.get_address(token_destination)
|
||||
# #if td == None:
|
||||
# # td = ContractRegistry.reserves[token_source]
|
||||
# logg.info('add token convert {} -> {} value {} -> {} trader {}'.format(
|
||||
# ts.symbol(),
|
||||
# td.symbol(),
|
||||
# token_from_value,
|
||||
# token_to_value,
|
||||
# token_trader,
|
||||
# )
|
||||
# )
|
||||
#
|
||||
# db.add_transaction(
|
||||
# session,
|
||||
# r.transactionHash.hex(),
|
||||
# r.blockNumber,
|
||||
# r.transactionIndex,
|
||||
# w3.toChecksumAddress(token_trader),
|
||||
# w3.toChecksumAddress(token_trader),
|
||||
# token_source,
|
||||
# token_destination,
|
||||
# r.status == 1,
|
||||
# b.timestamp,
|
||||
# )
|
||||
# session.flush()
|
||||
|
||||
|
||||
def check_token(self, address):
|
||||
t = None
|
||||
try:
|
||||
t = CICRegistry.get_address(CICRegistry.default_chain_spec, address)
|
||||
return t
|
||||
except UnknownContractError:
|
||||
logg.debug('contract {} not in registry'.format(address))
|
||||
|
||||
# If nothing was returned, we look up the token in the declarator
|
||||
for trust in self.trusts:
|
||||
logg.debug('look up declaration for contract {} with trust {}'.format(address, trust))
|
||||
fn = self.declarator.function('declaration')
|
||||
# TODO: cache trust in LRUcache
|
||||
declaration_array = fn(trust, address).call()
|
||||
try:
|
||||
declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp])
|
||||
logg.debug('found declaration for token {} from trust address {}'.format(address, trust))
|
||||
except UnknownDeclarationError:
|
||||
continue
|
||||
|
||||
try:
|
||||
c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address)
|
||||
t = CICRegistry.add_token(self.chain_spec, c)
|
||||
break
|
||||
except ValueError:
|
||||
logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address))
|
||||
|
||||
return t
|
||||
|
||||
|
||||
# TODO use input data instead of logs
|
||||
def process(self, w3, session, block):
|
||||
#self.refresh_registry(w3)
|
||||
tx_count = w3.eth.getBlockTransactionCount(block.hash)
|
||||
b = w3.eth.getBlock(block.hash)
|
||||
for i in range(self.tx_height, tx_count):
|
||||
tx = w3.eth.getTransactionByBlock(block.hash, i)
|
||||
if tx.to == None:
|
||||
logg.debug('block {} tx {} is contract creation tx, skipping'.format(block.number, i))
|
||||
continue
|
||||
if len(w3.eth.getCode(tx.to)) == 0:
|
||||
logg.debug('block {} tx {} not a contract tx, skipping'.format(block.number, i))
|
||||
continue
|
||||
|
||||
t = self.check_token(tx.to)
|
||||
if t != None and isinstance(t, Token):
|
||||
r = w3.eth.getTransactionReceipt(tx.hash)
|
||||
for l in r.logs:
|
||||
logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
|
||||
if l.topics[0].hex() == log_topics['transfer']:
|
||||
self.__process_tx(w3, session, t, r, l, b)
|
||||
|
||||
# TODO: cache contracts in LRUcache
|
||||
elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address:
|
||||
r = w3.eth.getTransactionReceipt(tx.hash)
|
||||
for l in r.logs:
|
||||
logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
|
||||
if l.topics[0].hex() == log_topics['convert']:
|
||||
self.__process_convert(w3, session, t, r, l, b)
|
||||
|
||||
session.execute("UPDATE tx_sync SET tx = '{}'".format(tx.hash.hex()))
|
||||
session.commit()
|
||||
self.tx_height += 1
|
||||
|
||||
|
||||
def __get_next_retry(self, backoff=False):
|
||||
return 1
|
||||
|
||||
|
||||
def loop(self):
|
||||
logg.info('starting at block {} tx index {}'.format(self.block_height, self.tx_height))
|
||||
self.state = RunStateEnum.RUN
|
||||
while self.state == RunStateEnum.RUN:
|
||||
(provider, w3) = web3_constructor()
|
||||
session = SessionBase.create_session()
|
||||
try:
|
||||
block = w3.eth.getBlock(self.block_height)
|
||||
self.process(w3, session, block)
|
||||
self.block_height += 1
|
||||
self.tx_height = 0
|
||||
except BlockNotFound as e:
|
||||
logg.debug('no block {} yet, zZzZ...'.format(self.block_height))
|
||||
time.sleep(self.__get_next_retry())
|
||||
except ConnectionClosedError as e:
|
||||
logg.info('connection gone, retrying')
|
||||
time.sleep(self.__get_next_retry(True))
|
||||
except OSError as e:
|
||||
logg.error('cannot connect {}'.format(e))
|
||||
time.sleep(self.__get_next_retry(True))
|
||||
except Exception as e:
|
||||
session.close()
|
||||
raise(e)
|
||||
session.close()
|
||||
|
||||
|
||||
def load(self, w3):
|
||||
session = SessionBase.create_session()
|
||||
r = session.execute('SELECT tx FROM tx_sync').first()
|
||||
if r != None:
|
||||
if r[0] == '0x{0:0{1}X}'.format(0, 64):
|
||||
logg.debug('last tx was zero-address, starting from scratch')
|
||||
return
|
||||
t = w3.eth.getTransaction(r[0])
|
||||
|
||||
self.block_height = t.blockNumber
|
||||
self.tx_height = t.transactionIndex+1
|
||||
c = w3.eth.getBlockTransactionCount(t.blockHash.hex())
|
||||
logg.debug('last tx processed {} index {} (max index {})'.format(t.blockNumber, t.transactionIndex, c-1))
|
||||
if c == self.tx_height:
|
||||
self.block_height += 1
|
||||
self.tx_height = 0
|
||||
session.close()
|
||||
|
||||
(provider, w3) = web3_constructor()
|
||||
trust = config.get('CIC_TRUST_ADDRESS', []).split(",")
|
||||
chain_spec = args.i
|
||||
|
||||
try:
|
||||
w3.eth.chainId
|
||||
except Exception as e:
|
||||
logg.exception(e)
|
||||
sys.stderr.write('cannot connect to evm node\n')
|
||||
sys.exit(1)
|
||||
|
||||
def main():
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
|
||||
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
|
||||
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
|
||||
chain_registry = ChainRegistry(chain_spec)
|
||||
CICRegistry.add_chain_registry(chain_registry)
|
||||
|
||||
t = Tracker(chain_spec, trust)
|
||||
t.load(w3)
|
||||
t.loop()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
38
apps/cic-cache/cic_cache/tasks/tx.py
Normal file
38
apps/cic-cache/cic_cache/tasks/tx.py
Normal file
@@ -0,0 +1,38 @@
|
||||
# third-party imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_cache.cache import BloomCache
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def tx_filter(self, offset, limit, address=None, encoding='hex'):
|
||||
queue = self.request.delivery_info.get('routing_key')
|
||||
|
||||
session = SessionBase.create_session()
|
||||
|
||||
c = BloomCache(session)
|
||||
b = None
|
||||
if address == None:
|
||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
|
||||
else:
|
||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
|
||||
|
||||
session.close()
|
||||
|
||||
o = {
|
||||
'alg': 'sha256',
|
||||
'low': lowest_block,
|
||||
'high': highest_block,
|
||||
'block_filter': bloom_filter_block.hex(),
|
||||
'blocktx_filter': bloom_filter_tx.hex(),
|
||||
'filter_rounds': 3,
|
||||
}
|
||||
|
||||
return o
|
||||
|
||||
|
||||
|
||||
18
apps/cic-cache/cic_cache/version.py
Normal file
18
apps/cic-cache/cic_cache/version.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import os
|
||||
import semver
|
||||
|
||||
version = (
|
||||
0,
|
||||
2,
|
||||
0,
|
||||
'alpha.1',
|
||||
)
|
||||
|
||||
version_object = semver.VersionInfo(
|
||||
major=version[0],
|
||||
minor=version[1],
|
||||
patch=version[2],
|
||||
prerelease=version[3],
|
||||
)
|
||||
|
||||
version_string = str(version_object)
|
||||
2
apps/cic-cache/config/bancor.ini
Normal file
2
apps/cic-cache/config/bancor.ini
Normal file
@@ -0,0 +1,2 @@
|
||||
[bancor]
|
||||
dir =
|
||||
3
apps/cic-cache/config/celery.ini
Normal file
3
apps/cic-cache/config/celery.ini
Normal file
@@ -0,0 +1,3 @@
|
||||
[celery]
|
||||
broker_url = redis:///
|
||||
result_url = redis:///
|
||||
4
apps/cic-cache/config/cic.ini
Normal file
4
apps/cic-cache/config/cic.ini
Normal file
@@ -0,0 +1,4 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
chain_spec =
|
||||
trust_address =
|
||||
9
apps/cic-cache/config/database.ini
Normal file
9
apps/cic-cache/config/database.ini
Normal file
@@ -0,0 +1,9 @@
|
||||
[database]
|
||||
NAME=cic-eth
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=postgresql
|
||||
DRIVER=psycopg2
|
||||
DEBUG=
|
||||
3
apps/cic-cache/config/docker/bancor.ini
Normal file
3
apps/cic-cache/config/docker/bancor.ini
Normal file
@@ -0,0 +1,3 @@
|
||||
[bancor]
|
||||
registry_address =
|
||||
dir = /usr/local/share/bancor
|
||||
3
apps/cic-cache/config/docker/celery.ini
Normal file
3
apps/cic-cache/config/docker/celery.ini
Normal file
@@ -0,0 +1,3 @@
|
||||
[celery]
|
||||
broker_url = redis://localhost:63379
|
||||
result_url = redis://localhost:63379
|
||||
9
apps/cic-cache/config/docker/database.ini
Normal file
9
apps/cic-cache/config/docker/database.ini
Normal file
@@ -0,0 +1,9 @@
|
||||
[database]
|
||||
NAME=cic_cache
|
||||
USER=grassroots
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=63432
|
||||
ENGINE=postgresql
|
||||
DRIVER=psycopg2
|
||||
DEBUG=1
|
||||
3
apps/cic-cache/config/docker/eth.ini
Normal file
3
apps/cic-cache/config/docker/eth.ini
Normal file
@@ -0,0 +1,3 @@
|
||||
[eth]
|
||||
provider = ws://localhost:63546
|
||||
chain_id = 8996
|
||||
7
apps/cic-cache/config/eth.ini
Normal file
7
apps/cic-cache/config/eth.ini
Normal file
@@ -0,0 +1,7 @@
|
||||
[eth]
|
||||
provider = ws://localhost:8545
|
||||
#ttp_provider = http://localhost:8545
|
||||
#provider = http://localhost:8545
|
||||
gas_provider_address =
|
||||
#chain_id =
|
||||
abi_dir = /usr/local/share/cic/solidity/abi
|
||||
2
apps/cic-cache/config/test/bancor.ini
Normal file
2
apps/cic-cache/config/test/bancor.ini
Normal file
@@ -0,0 +1,2 @@
|
||||
[bancor]
|
||||
dir =
|
||||
2
apps/cic-cache/config/test/cic.ini
Normal file
2
apps/cic-cache/config/test/cic.ini
Normal file
@@ -0,0 +1,2 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
9
apps/cic-cache/config/test/database.ini
Normal file
9
apps/cic-cache/config/test/database.ini
Normal file
@@ -0,0 +1,9 @@
|
||||
[database]
|
||||
NAME=cic-cache-test
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=sqlite
|
||||
DRIVER=pysqlite
|
||||
DEBUG=
|
||||
5
apps/cic-cache/config/test/eth.ini
Normal file
5
apps/cic-cache/config/test/eth.ini
Normal file
@@ -0,0 +1,5 @@
|
||||
[eth]
|
||||
#ws_provider = ws://localhost:8546
|
||||
#ttp_provider = http://localhost:8545
|
||||
provider = http://localhost:8545
|
||||
#chain_id =
|
||||
5
apps/cic-cache/db/initdb_files/create_all_db.sql
Normal file
5
apps/cic-cache/db/initdb_files/create_all_db.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
CREATE DATABASE "cic-cache";
|
||||
CREATE DATABASE "cic-eth";
|
||||
CREATE DATABASE "cic-notify";
|
||||
CREATE DATABASE "cic-meta";
|
||||
CREATE DATABASE "cic-signer";
|
||||
22
apps/cic-cache/db/psycopg2/db.sql
Normal file
22
apps/cic-cache/db/psycopg2/db.sql
Normal file
@@ -0,0 +1,22 @@
|
||||
CREATE TABLE tx (
|
||||
id SERIAL PRIMARY KEY,
|
||||
date_registered TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
|
||||
block_number INTEGER NOT NULL,
|
||||
tx_index INTEGER NOT NULL,
|
||||
tx_hash VARCHAR(66) NOT NULL,
|
||||
sender VARCHAR(42) NOT NULL,
|
||||
recipient VARCHAR(42) NOT NULL,
|
||||
source_token VARCHAR(42) NOT NULL,
|
||||
destination_token VARCHAR(42) NOT NULL,
|
||||
from_value BIGINT NOT NULL,
|
||||
to_value BIGINT NOT NULL,
|
||||
success BOOLEAN NOT NULL,
|
||||
date_block TIMESTAMP NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE tx_sync (
|
||||
id SERIAL PRIMARY KEY,
|
||||
tx VARCHAR(66) NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');
|
||||
23
apps/cic-cache/db/pysqlite/db.sql
Normal file
23
apps/cic-cache/db/pysqlite/db.sql
Normal file
@@ -0,0 +1,23 @@
|
||||
CREATE TABLE tx (
|
||||
id SERIAL PRIMARY KEY,
|
||||
date_registered DATETIME NOT NULL default CURRENT_DATE,
|
||||
block_number INTEGER NOT NULL,
|
||||
tx_index INTEGER NOT NULL,
|
||||
tx_hash VARCHAR(66) NOT NULL,
|
||||
sender VARCHAR(42) NOT NULL,
|
||||
recipient VARCHAR(42) NOT NULL,
|
||||
source_token VARCHAR(42) NOT NULL,
|
||||
destination_token VARCHAR(42) NOT NULL,
|
||||
from_value INTEGER NOT NULL,
|
||||
to_value INTEGER NOT NULL,
|
||||
success BOOLEAN NOT NULL,
|
||||
date_block DATETIME NOT NULL,
|
||||
CHECK (success IN (0, 1))
|
||||
);
|
||||
|
||||
CREATE TABLE tx_sync (
|
||||
id SERIAL PRIMARY_KEY,
|
||||
tx VARCHAR(66) NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');
|
||||
102
apps/cic-cache/doc/openapi/server.yml
Normal file
102
apps/cic-cache/doc/openapi/server.yml
Normal file
@@ -0,0 +1,102 @@
|
||||
openapi: "3.0.3"
|
||||
info:
|
||||
title: Grassroots Economics CIC Cache
|
||||
description: Cache of processed transaction data from Ethereum blockchain and worker queues
|
||||
termsOfService: bzz://grassrootseconomics.eth/terms
|
||||
contact:
|
||||
name: Grassroots Economics
|
||||
url: https://www.grassrootseconomics.org
|
||||
email: will@grassecon.org
|
||||
license:
|
||||
name: GPLv3
|
||||
version: 0.1.0
|
||||
|
||||
paths:
|
||||
/tx/{offset}/{limit}:
|
||||
description: Bloom filter for batch of latest transactions
|
||||
get:
|
||||
tags:
|
||||
- transactions
|
||||
description:
|
||||
Retrieve transactions
|
||||
operationId: tx.get
|
||||
responses:
|
||||
200:
|
||||
description: Transaction query successful.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/BlocksBloom"
|
||||
|
||||
|
||||
parameters:
|
||||
- name: offset
|
||||
in: path
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
- name: limit
|
||||
in: path
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
|
||||
|
||||
/tx/{address}/{offset}/{limit}:
|
||||
description: Bloom filter for batch of latest transactions by account
|
||||
get:
|
||||
tags:
|
||||
- transactions
|
||||
description:
|
||||
Retrieve transactions
|
||||
operationId: tx.get
|
||||
responses:
|
||||
200:
|
||||
description: Transaction query successful.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/BlocksBloom"
|
||||
|
||||
|
||||
parameters:
|
||||
- name: address
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: offset
|
||||
in: path
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
- name: limit
|
||||
in: path
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
|
||||
components:
|
||||
schemas:
|
||||
BlocksBloom:
|
||||
type: object
|
||||
properties:
|
||||
low:
|
||||
type: int
|
||||
format: int32
|
||||
description: The lowest block number included in the filter
|
||||
block_filter:
|
||||
type: string
|
||||
format: byte
|
||||
description: Block number filter
|
||||
blocktx_filter:
|
||||
type: string
|
||||
format: byte
|
||||
description: Block and tx index filter
|
||||
alg:
|
||||
type: string
|
||||
description: Hashing algorithm (currently only using sha256)
|
||||
filter_rounds:
|
||||
type: int
|
||||
format: int32
|
||||
description: Number of hash rounds used to create the filter
|
||||
54
apps/cic-cache/docker/Dockerfile
Normal file
54
apps/cic-cache/docker/Dockerfile
Normal file
@@ -0,0 +1,54 @@
|
||||
FROM python:3.8.6-slim-buster
|
||||
|
||||
#COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
|
||||
|
||||
WORKDIR /usr/src/cic-cache
|
||||
|
||||
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
|
||||
ARG root_requirement_file='requirements.txt'
|
||||
|
||||
#RUN apk update && \
|
||||
# apk add gcc musl-dev gnupg libpq
|
||||
#RUN apk add postgresql-dev
|
||||
#RUN apk add linux-headers
|
||||
#RUN apk add libffi-dev
|
||||
RUN apt-get update && \
|
||||
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
|
||||
|
||||
# Copy shared requirements from top of mono-repo
|
||||
RUN echo "copying root req file ${root_requirement_file}"
|
||||
COPY $root_requirement_file .
|
||||
RUN pip install -r $root_requirement_file $pip_extra_index_url_flag
|
||||
|
||||
COPY cic-cache/requirements.txt ./
|
||||
COPY cic-cache/setup.cfg \
|
||||
cic-cache/setup.py \
|
||||
./
|
||||
COPY cic-cache/cic_cache/ ./cic_cache/
|
||||
COPY cic-cache/scripts/ ./scripts/
|
||||
COPY cic-cache/test_requirements.txt ./
|
||||
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
|
||||
RUN pip install $pip_extra_index_url_flag .
|
||||
RUN pip install .[server]
|
||||
|
||||
COPY cic-cache/tests/ ./tests/
|
||||
#COPY db/ cic-cache/db
|
||||
#RUN apk add postgresql-client
|
||||
|
||||
# ini files in config directory defines the configurable parameters for the application
|
||||
# they can all be overridden by environment variables
|
||||
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
|
||||
COPY cic-cache/config/ /usr/local/etc/cic-cache/
|
||||
|
||||
# for db migrations
|
||||
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
|
||||
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
|
||||
|
||||
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
|
||||
mkdir -p /usr/local/share/cic/solidity && \
|
||||
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
|
||||
|
||||
# Tracker
|
||||
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
|
||||
# Server
|
||||
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
|
||||
3616
apps/cic-cache/examples/bloom_client/package-lock.json
generated
Normal file
3616
apps/cic-cache/examples/bloom_client/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
40
apps/cic-cache/examples/bloom_client/parse.js
Normal file
40
apps/cic-cache/examples/bloom_client/parse.js
Normal file
@@ -0,0 +1,40 @@
|
||||
let xmlhttprequest = require('xhr2');
|
||||
let moolb = require('moolb');
|
||||
|
||||
let xhr = new xmlhttprequest();
|
||||
xhr.responseType = 'json';
|
||||
xhr.open('GET', 'http://localhost:5555/tx/0/100');
|
||||
xhr.addEventListener('load', (e) => {
|
||||
|
||||
d = xhr.response;
|
||||
|
||||
b_one = Buffer.from(d.block_filter, 'base64');
|
||||
b_two = Buffer.from(d.blocktx_filter, 'base64');
|
||||
|
||||
for (let i = 0; i < 8192; i++) {
|
||||
if (b_two[i] > 0) {
|
||||
console.debug('value on', i, b_two[i]);
|
||||
}
|
||||
}
|
||||
console.log(b_one, b_two);
|
||||
|
||||
let f_block = moolb.fromBytes(b_one, d.filter_rounds);
|
||||
let f_blocktx = moolb.fromBytes(b_two, d.filter_rounds);
|
||||
let a = new ArrayBuffer(8);
|
||||
let w = new DataView(a);
|
||||
for (let i = 410000; i < 430000; i++) {
|
||||
w.setInt32(0, i);
|
||||
let r = new Uint8Array(a.slice(0, 4));
|
||||
if (f_block.check(r)) {
|
||||
for (let j = 0; j < 200; j++) {
|
||||
w = new DataView(a);
|
||||
w.setInt32(4, j);
|
||||
r = new Uint8Array(a);
|
||||
if (f_blocktx.check(r)) {
|
||||
console.log('true', i, j);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let r = xhr.send();
|
||||
10
apps/cic-cache/requirements.txt
Normal file
10
apps/cic-cache/requirements.txt
Normal file
@@ -0,0 +1,10 @@
|
||||
alembic==1.4.2
|
||||
confini~=0.3.6b2
|
||||
uwsgi==2.0.19.1
|
||||
moolb~=0.1.0
|
||||
cic-registry~=0.5.3a4
|
||||
SQLAlchemy==1.3.20
|
||||
semver==2.13.0
|
||||
psycopg2==2.8.6
|
||||
celery==4.4.7
|
||||
redis==3.5.3
|
||||
56
apps/cic-cache/scripts/migrate.py
Normal file
56
apps/cic-cache/scripts/migrate.py
Normal file
@@ -0,0 +1,56 @@
|
||||
#!/usr/bin/python
|
||||
import os
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
import alembic
|
||||
from alembic.config import Config as AlembicConfig
|
||||
import confini
|
||||
|
||||
from cic_cache.db import dsn_from_config
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
# BUG: the dbdir doesn't work after script install
|
||||
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
|
||||
migrationsdir = os.path.join(dbdir, 'migrations')
|
||||
|
||||
config_dir = os.path.join('/usr/local/etc/cic-cache')
|
||||
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory')
|
||||
argparser.add_argument('-v', action='store_true', help='be verbose')
|
||||
argparser.add_argument('-vv', action='store_true', help='be more verbose')
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.vv:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
elif args.v:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
config = confini.Config(args.c, args.env_prefix)
|
||||
config.process()
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config:\n{}'.format(config))
|
||||
|
||||
migrations_dir = os.path.join(args.migrations_dir, config.get('DATABASE_ENGINE'))
|
||||
if not os.path.isdir(migrations_dir):
|
||||
logg.debug('migrations dir for engine {} not found, reverting to default'.format(config.get('DATABASE_ENGINE')))
|
||||
migrations_dir = os.path.join(args.migrations_dir, 'default')
|
||||
|
||||
# connect to database
|
||||
dsn = dsn_from_config(config)
|
||||
|
||||
|
||||
logg.info('using migrations dir {}'.format(migrations_dir))
|
||||
logg.info('using db {}'.format(dsn))
|
||||
ac = AlembicConfig(os.path.join(migrations_dir, 'alembic.ini'))
|
||||
ac.set_main_option('sqlalchemy.url', dsn)
|
||||
ac.set_main_option('script_location', migrations_dir)
|
||||
|
||||
alembic.command.upgrade(ac, 'head')
|
||||
@@ -1,11 +1,12 @@
|
||||
[metadata]
|
||||
name = cic-dev-fake
|
||||
version = 0.0.1
|
||||
description = Fake data generator tools
|
||||
name = cic-cache
|
||||
description = CIC Cache API and server
|
||||
author = Louis Holbrook
|
||||
author_email = dev@holbrook.no
|
||||
url = https://gitlab.com/nolash/simple-multisig
|
||||
url = https://gitlab.com/grassrootseconomics/cic-eth
|
||||
keywords =
|
||||
cic
|
||||
cryptocurrency
|
||||
ethereum
|
||||
classifiers =
|
||||
Programming Language :: Python :: 3
|
||||
@@ -15,26 +16,22 @@ classifiers =
|
||||
Intended Audience :: Developers
|
||||
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
|
||||
Topic :: Internet
|
||||
#Topic :: Blockchain :: EVM
|
||||
# Topic :: Blockchain :: EVM
|
||||
license = GPL3
|
||||
licence_files =
|
||||
LICENSE
|
||||
LICENSE.txt
|
||||
|
||||
[options]
|
||||
python_requires = >= 3.6
|
||||
install_requires =
|
||||
web3==5.12.2
|
||||
vobject==0.9.6.1
|
||||
faker==4.17.1
|
||||
tests_require =
|
||||
eth-tester==0.5.0b2
|
||||
py-evm==0.3.0a20
|
||||
packages =
|
||||
cic_cache
|
||||
cic_cache.db
|
||||
cic_cache.db.models
|
||||
cic_cache.runnable
|
||||
scripts =
|
||||
scripts/users.py
|
||||
scripts/tx_generator.py
|
||||
scripts/tx_seed.py
|
||||
./scripts/migrate.py
|
||||
|
||||
[options.extras_require]
|
||||
testing =
|
||||
eth-tester==0.5.0b2
|
||||
py-evm==0.3.0a20
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
cic-cache-tracker = cic_cache.runnable.tracker:main
|
||||
cic-cache-server = cic_cache.runnable.server:main
|
||||
60
apps/cic-cache/setup.py
Normal file
60
apps/cic-cache/setup.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from setuptools import setup
|
||||
|
||||
import configparser
|
||||
import os
|
||||
import time
|
||||
|
||||
from cic_cache.version import (
|
||||
version_object,
|
||||
version_string
|
||||
)
|
||||
|
||||
class PleaseCommitFirstError(Exception):
|
||||
pass
|
||||
|
||||
def git_hash():
|
||||
import subprocess
|
||||
git_diff = subprocess.run(['git', 'diff'], capture_output=True)
|
||||
if len(git_diff.stdout) > 0:
|
||||
raise PleaseCommitFirstError()
|
||||
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
|
||||
git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
|
||||
return git_hash_brief
|
||||
|
||||
version_string = str(version_object)
|
||||
|
||||
try:
|
||||
version_git = git_hash()
|
||||
version_string += '+build.{}'.format(version_git)
|
||||
except FileNotFoundError:
|
||||
time_string_pair = str(time.time()).split('.')
|
||||
version_string += '+build.{}{:<09d}'.format(
|
||||
time_string_pair[0],
|
||||
int(time_string_pair[1]),
|
||||
)
|
||||
print('final version string will be {}'.format(version_string))
|
||||
|
||||
requirements = []
|
||||
f = open('requirements.txt', 'r')
|
||||
while True:
|
||||
l = f.readline()
|
||||
if l == '':
|
||||
break
|
||||
requirements.append(l.rstrip())
|
||||
f.close()
|
||||
|
||||
test_requirements = []
|
||||
f = open('test_requirements.txt', 'r')
|
||||
while True:
|
||||
l = f.readline()
|
||||
if l == '':
|
||||
break
|
||||
test_requirements.append(l.rstrip())
|
||||
f.close()
|
||||
|
||||
|
||||
setup(
|
||||
version=version_string,
|
||||
install_requires=requirements,
|
||||
tests_require=test_requirements,
|
||||
)
|
||||
6
apps/cic-cache/test_requirements.txt
Normal file
6
apps/cic-cache/test_requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
pytest==6.0.1
|
||||
pytest-cov==2.10.1
|
||||
pytest-mock==3.3.1
|
||||
pysqlite3==0.4.3
|
||||
sqlparse==0.4.1
|
||||
pytest-celery==0.0.0a1
|
||||
86
apps/cic-cache/tests/conftest.py
Normal file
86
apps/cic-cache/tests/conftest.py
Normal file
@@ -0,0 +1,86 @@
|
||||
# standard imports
|
||||
import os
|
||||
import sys
|
||||
import datetime
|
||||
|
||||
# third-party imports
|
||||
import pytest
|
||||
|
||||
# local imports
|
||||
from cic_cache import db
|
||||
|
||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
root_dir = os.path.dirname(script_dir)
|
||||
sys.path.insert(0, root_dir)
|
||||
|
||||
# fixtures
|
||||
from tests.fixtures_config import *
|
||||
from tests.fixtures_database import *
|
||||
from tests.fixtures_celery import *
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def balances_dict_fields():
|
||||
return {
|
||||
'out_pending': 0,
|
||||
'out_synced': 1,
|
||||
'out_confirmed': 2,
|
||||
'in_pending': 3,
|
||||
'in_synced': 4,
|
||||
'in_confirmed': 5,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def txs(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
list_tokens,
|
||||
):
|
||||
|
||||
session = init_database
|
||||
|
||||
tx_number = 13
|
||||
tx_hash_first = '0x' + os.urandom(32).hex()
|
||||
val = 15000
|
||||
nonce = 1
|
||||
dt = datetime.datetime.utcnow()
|
||||
db.add_transaction(
|
||||
session,
|
||||
tx_hash_first,
|
||||
list_defaults['block'],
|
||||
tx_number,
|
||||
list_actors['alice'],
|
||||
list_actors['bob'],
|
||||
list_tokens['foo'],
|
||||
list_tokens['foo'],
|
||||
1024,
|
||||
2048,
|
||||
True,
|
||||
dt.timestamp(),
|
||||
)
|
||||
|
||||
|
||||
tx_number = 42
|
||||
tx_hash_second = '0x' + os.urandom(32).hex()
|
||||
tx_signed_second = '0x' + os.urandom(128).hex()
|
||||
nonce = 1
|
||||
dt -= datetime.timedelta(hours=1)
|
||||
db.add_transaction(
|
||||
session,
|
||||
tx_hash_second,
|
||||
list_defaults['block']-1,
|
||||
tx_number,
|
||||
list_actors['diane'],
|
||||
list_actors['alice'],
|
||||
list_tokens['foo'],
|
||||
list_tokens['foo'],
|
||||
1024,
|
||||
2048,
|
||||
False,
|
||||
dt.timestamp(),
|
||||
)
|
||||
|
||||
session.commit()
|
||||
|
||||
48
apps/cic-cache/tests/fixtures_celery.py
Normal file
48
apps/cic-cache/tests/fixtures_celery.py
Normal file
@@ -0,0 +1,48 @@
|
||||
# third-party imports
|
||||
import pytest
|
||||
import tempfile
|
||||
import logging
|
||||
import shutil
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# celery fixtures
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_includes():
|
||||
return [
|
||||
'cic_cache.tasks.tx',
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_config():
|
||||
bq = tempfile.mkdtemp()
|
||||
bp = tempfile.mkdtemp()
|
||||
rq = tempfile.mkdtemp()
|
||||
logg.debug('celery broker queue {} processed {}'.format(bq, bp))
|
||||
logg.debug('celery backend store {}'.format(rq))
|
||||
yield {
|
||||
'broker_url': 'filesystem://',
|
||||
'broker_transport_options': {
|
||||
'data_folder_in': bq,
|
||||
'data_folder_out': bq,
|
||||
'data_folder_processed': bp,
|
||||
},
|
||||
'result_backend': 'file://{}'.format(rq),
|
||||
}
|
||||
logg.debug('cleaning up celery filesystem backend files {} {} {}'.format(bq, bp, rq))
|
||||
shutil.rmtree(bq)
|
||||
shutil.rmtree(bp)
|
||||
shutil.rmtree(rq)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_worker_parameters():
|
||||
return {
|
||||
# 'queues': ('cic-cache'),
|
||||
}
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_enable_logging():
|
||||
return True
|
||||
20
apps/cic-cache/tests/fixtures_config.py
Normal file
20
apps/cic-cache/tests/fixtures_config.py
Normal file
@@ -0,0 +1,20 @@
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import pytest
|
||||
import confini
|
||||
|
||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
root_dir = os.path.dirname(script_dir)
|
||||
logg = logging.getLogger(__file__)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def load_config():
|
||||
config_dir = os.path.join(root_dir, '.config/test')
|
||||
conf = confini.Config(config_dir, 'CICTEST')
|
||||
conf.process()
|
||||
logg.debug('config {}'.format(conf))
|
||||
return conf
|
||||
118
apps/cic-cache/tests/fixtures_database.py
Normal file
118
apps/cic-cache/tests/fixtures_database.py
Normal file
@@ -0,0 +1,118 @@
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
import re
|
||||
|
||||
# third-party imports
|
||||
import pytest
|
||||
import sqlparse
|
||||
|
||||
# local imports
|
||||
from cic_cache.db.models.base import SessionBase
|
||||
from cic_cache.db import dsn_from_config
|
||||
|
||||
logg = logging.getLogger(__file__)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def database_engine(
|
||||
load_config,
|
||||
):
|
||||
if load_config.get('DATABASE_ENGINE') == 'sqlite':
|
||||
SessionBase.transactional = False
|
||||
SessionBase.poolable = False
|
||||
try:
|
||||
os.unlink(load_config.get('DATABASE_NAME'))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
dsn = dsn_from_config(load_config)
|
||||
SessionBase.connect(dsn)
|
||||
return dsn
|
||||
|
||||
|
||||
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
|
||||
@pytest.fixture(scope='function')
|
||||
def init_database(
|
||||
load_config,
|
||||
database_engine,
|
||||
):
|
||||
|
||||
rootdir = os.path.dirname(os.path.dirname(__file__))
|
||||
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
|
||||
|
||||
if load_config.get('DATABASE_ENGINE') == 'sqlite':
|
||||
rconn = SessionBase.engine.raw_connection()
|
||||
f = open(os.path.join(schemadir, 'db.sql'))
|
||||
s = f.read()
|
||||
f.close()
|
||||
rconn.executescript(s)
|
||||
|
||||
else:
|
||||
rconn = SessionBase.engine.raw_connection()
|
||||
rcursor = rconn.cursor()
|
||||
|
||||
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
|
||||
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
|
||||
|
||||
f = open(os.path.join(schemadir, 'db.sql'))
|
||||
s = f.read()
|
||||
f.close()
|
||||
r = re.compile(r'^[A-Z]', re.MULTILINE)
|
||||
for l in sqlparse.parse(s):
|
||||
strl = str(l)
|
||||
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
|
||||
if not re.search(r, strl):
|
||||
logg.warning('skipping parsed query line {}'.format(strl))
|
||||
continue
|
||||
rcursor.execute(strl)
|
||||
rconn.commit()
|
||||
|
||||
rcursor.execute('SET search_path TO public')
|
||||
|
||||
# this doesn't work when run separately, no idea why
|
||||
# functions have been manually added to original schema from cic-eth
|
||||
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
|
||||
# s = f.read()
|
||||
# f.close()
|
||||
# rcursor.execute(s)
|
||||
#
|
||||
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
|
||||
# s = f.read()
|
||||
# f.close()
|
||||
# rcursor.execute(s)
|
||||
|
||||
rcursor.close()
|
||||
|
||||
session = SessionBase.create_session()
|
||||
yield session
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def list_tokens(
|
||||
):
|
||||
return {
|
||||
'foo': '0x' + os.urandom(20).hex(),
|
||||
'bar': '0x' + os.urandom(20).hex(),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def list_actors(
|
||||
):
|
||||
return {
|
||||
'alice': '0x' + os.urandom(20).hex(),
|
||||
'bob': '0x' + os.urandom(20).hex(),
|
||||
'charlie': '0x' + os.urandom(20).hex(),
|
||||
'diane': '0x' + os.urandom(20).hex(),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def list_defaults(
|
||||
):
|
||||
|
||||
return {
|
||||
'block': 420000,
|
||||
}
|
||||
35
apps/cic-cache/tests/test_cache.py
Normal file
35
apps/cic-cache/tests/test_cache.py
Normal file
@@ -0,0 +1,35 @@
|
||||
# standard imports
|
||||
import os
|
||||
import datetime
|
||||
import logging
|
||||
import json
|
||||
|
||||
# third-party imports
|
||||
import pytest
|
||||
|
||||
# local imports
|
||||
from cic_cache import BloomCache
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def test_cache(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
list_tokens,
|
||||
txs,
|
||||
):
|
||||
|
||||
session = init_database
|
||||
|
||||
c = BloomCache(session)
|
||||
b = c.load_transactions(0, 100)
|
||||
|
||||
assert b[0] == list_defaults['block'] - 1
|
||||
|
||||
c = BloomCache(session)
|
||||
c.load_transactions_account(list_actors['alice'],0, 100)
|
||||
|
||||
assert b[0] == list_defaults['block'] - 1
|
||||
|
||||
27
apps/cic-cache/tests/test_task.py
Normal file
27
apps/cic-cache/tests/test_task.py
Normal file
@@ -0,0 +1,27 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_cache.api import Api
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def test_task(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
list_tokens,
|
||||
txs,
|
||||
celery_session_worker,
|
||||
):
|
||||
|
||||
api = Api(queue=None)
|
||||
t = api.list(0, 100)
|
||||
r = t.get()
|
||||
logg.debug('r {}'.format(r))
|
||||
|
||||
assert r['low'] == list_defaults['block'] - 1
|
||||
@@ -8,6 +8,7 @@ from cic_registry import zero_address
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.enum import LockEnum
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.db.models.lock import Lock
|
||||
from cic_eth.error import LockedError
|
||||
|
||||
@@ -116,9 +117,10 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
|
||||
|
||||
@celery_app.task()
|
||||
def check_lock(chained_input, chain_str, lock_flags, address=None):
|
||||
r = Lock.check(chain_str, lock_flags, address=zero_address)
|
||||
session = SessionBase.create_session()
|
||||
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)
|
||||
if address != None:
|
||||
r |= Lock.check(chain_str, lock_flags, address=address)
|
||||
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
|
||||
if r > 0:
|
||||
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
|
||||
raise LockedError(r)
|
||||
|
||||
@@ -16,7 +16,10 @@ from cic_eth.db.models.role import AccountRole
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.nonce import Nonce
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db.enum import (
|
||||
StatusEnum,
|
||||
is_alive,
|
||||
)
|
||||
from cic_eth.error import InitializationError
|
||||
from cic_eth.db.error import TxStateChangeError
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
@@ -98,6 +101,19 @@ class AdminApi:
|
||||
session.close()
|
||||
|
||||
|
||||
def have_account(self, address_hex, chain_str):
|
||||
s_have = celery.signature(
|
||||
'cic_eth.eth.account.have',
|
||||
[
|
||||
address_hex,
|
||||
chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
t = s_have.apply_async()
|
||||
return t.get()
|
||||
|
||||
|
||||
def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False):
|
||||
logg.debug('resend {}'.format(tx_hash_hex))
|
||||
s_get_tx_cache = celery.signature(
|
||||
@@ -110,24 +126,32 @@ class AdminApi:
|
||||
|
||||
# TODO: This check should most likely be in resend task itself
|
||||
tx_dict = s_get_tx_cache.apply_async().get()
|
||||
if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
|
||||
#if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
|
||||
if not is_alive(getattr(StatusEnum, tx_dict['status']).value):
|
||||
raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex))
|
||||
|
||||
s = None
|
||||
if in_place:
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.resend_with_higher_gas',
|
||||
[
|
||||
tx_hash_hex,
|
||||
chain_str,
|
||||
None,
|
||||
1.01,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
else:
|
||||
|
||||
if not in_place:
|
||||
raise NotImplementedError('resend as new not yet implemented')
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.resend_with_higher_gas',
|
||||
[
|
||||
chain_str,
|
||||
None,
|
||||
1.01,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
|
||||
s_manual = celery.signature(
|
||||
'cic_eth.queue.tx.set_manual',
|
||||
[
|
||||
tx_hash_hex,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_manual.link(s)
|
||||
|
||||
if unlock:
|
||||
s_gas = celery.signature(
|
||||
'cic_eth.admin.ctrl.unlock_send',
|
||||
@@ -139,7 +163,7 @@ class AdminApi:
|
||||
)
|
||||
s.link(s_gas)
|
||||
|
||||
return s.apply_async()
|
||||
return s_manual.apply_async()
|
||||
|
||||
def check_nonce(self, address):
|
||||
s = celery.signature(
|
||||
@@ -243,7 +267,9 @@ class AdminApi:
|
||||
"""
|
||||
s = celery.signature(
|
||||
'cic_eth.queue.tx.get_account_tx',
|
||||
[address],
|
||||
[
|
||||
address,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
txs = s.apply_async().get()
|
||||
@@ -431,6 +457,7 @@ class AdminApi:
|
||||
tx_unpacked = unpack_signed_raw_tx(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
|
||||
tx['gas_price'] = tx_unpacked['gasPrice']
|
||||
tx['gas_limit'] = tx_unpacked['gas']
|
||||
tx['data'] = tx_unpacked['data']
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.queue.tx.get_state_log',
|
||||
|
||||
@@ -6,10 +6,13 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
# external imports
|
||||
import celery
|
||||
from cic_registry.chain import ChainSpec
|
||||
#from cic_registry.chain import ChainSpec
|
||||
from cic_registry import CICRegistry
|
||||
from chainlib.chain import ChainSpec
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth.factory import TxFactory
|
||||
from cic_eth.db.enum import LockEnum
|
||||
|
||||
@@ -30,7 +33,7 @@ class Api:
|
||||
:param queue: Name of worker queue to submit tasks to
|
||||
:type queue: str
|
||||
"""
|
||||
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop', callback_queue=None):
|
||||
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
|
||||
self.chain_str = chain_str
|
||||
self.chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
self.callback_param = callback_param
|
||||
@@ -301,13 +304,15 @@ class Api:
|
||||
return t
|
||||
|
||||
|
||||
def balance(self, address, token_symbol):
|
||||
def balance(self, address, token_symbol, include_pending=True):
|
||||
"""Calls the provided callback with the current token balance of the given address.
|
||||
|
||||
:param address: Ethereum address of holder
|
||||
:type address: str, 0x-hex
|
||||
:param token_symbol: ERC20 token symbol of token to send
|
||||
:type token_symbol: str
|
||||
:param include_pending: If set, will include transactions that have not yet been fully processed
|
||||
:type include_pending: bool
|
||||
:returns: uuid of root task
|
||||
:rtype: celery.Task
|
||||
"""
|
||||
@@ -330,14 +335,45 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
|
||||
if self.callback_param != None:
|
||||
s_balance.link(self.callback_success)
|
||||
s_tokens.link(s_balance).on_error(self.callback_error)
|
||||
else:
|
||||
s_tokens.link(s_balance)
|
||||
s_result = celery.signature(
|
||||
'cic_eth.queue.balance.assemble_balances',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
|
||||
t = s_tokens.apply_async(queue=self.queue)
|
||||
last_in_chain = s_balance
|
||||
if include_pending:
|
||||
s_balance_incoming = celery.signature(
|
||||
'cic_eth.queue.balance.balance_incoming',
|
||||
[
|
||||
address,
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_balance_outgoing = celery.signature(
|
||||
'cic_eth.queue.balance.balance_outgoing',
|
||||
[
|
||||
address,
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_balance.link(s_balance_incoming)
|
||||
s_balance_incoming.link(s_balance_outgoing)
|
||||
last_in_chain = s_balance_outgoing
|
||||
|
||||
one = celery.chain(s_tokens, s_balance)
|
||||
two = celery.chain(s_tokens, s_balance_incoming)
|
||||
three = celery.chain(s_tokens, s_balance_outgoing)
|
||||
|
||||
t = None
|
||||
if self.callback_param != None:
|
||||
s_result.link(self.callback_success).on_error(self.callback_error)
|
||||
t = celery.chord([one, two, three])(s_result)
|
||||
else:
|
||||
t = celery.chord([one, two, three])(s_result)
|
||||
|
||||
return t
|
||||
|
||||
|
||||
@@ -417,6 +453,73 @@ class Api:
|
||||
return t
|
||||
|
||||
|
||||
def list(self, address, limit=10, external_task=None, external_queue=None):
|
||||
"""Retrieve an aggregate list of latest transactions of internal and (optionally) external origin in reverse chronological order.
|
||||
|
||||
The array of transactions returned have the same dict layout as those passed by the callback filter in cic_eth/runnable/manager
|
||||
|
||||
If the external task is defined, this task will be used to query external transactions. If this is not defined, no external transactions will be included. The task must accept (offset, limit, address) as input parameters, and return a bloom filter that will be used to retrieve transaction data for the matching transactions. See cic_eth.ext.tx.list_tx_by_bloom for details on the bloom filter dat format.
|
||||
|
||||
:param address: Ethereum address to list transactions for
|
||||
:type address: str, 0x-hex
|
||||
:param limit: Amount of results to return
|
||||
:type limit: number
|
||||
:param external_task: Celery task providing external transactions
|
||||
:type external_task: str
|
||||
:param external_queue: Celery task queue providing exernal transactions task
|
||||
:type external_queue: str
|
||||
:returns: List of transactions
|
||||
:rtype: list of dict
|
||||
"""
|
||||
offset = 0
|
||||
s_local = celery.signature(
|
||||
'cic_eth.queue.tx.get_account_tx',
|
||||
[
|
||||
address,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
|
||||
s_brief = celery.signature(
|
||||
'cic_eth.ext.tx.tx_collate',
|
||||
[
|
||||
self.chain_str,
|
||||
offset,
|
||||
limit
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
if self.callback_param != None:
|
||||
s_assemble.link(self.callback_success).on_error(self.callback_error)
|
||||
|
||||
t = None
|
||||
if external_task != None:
|
||||
s_external_get = celery.signature(
|
||||
external_task,
|
||||
[
|
||||
address,
|
||||
offset,
|
||||
limit,
|
||||
],
|
||||
queue=external_queue,
|
||||
)
|
||||
|
||||
s_external_process = celery.signature(
|
||||
'cic_eth.ext.tx.list_tx_by_bloom',
|
||||
[
|
||||
address,
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
c = celery.chain(s_external_get, s_external_process)
|
||||
t = celery.chord([s_local, c])(s_brief)
|
||||
else:
|
||||
t = s_local.apply_sync()
|
||||
|
||||
return t
|
||||
|
||||
|
||||
def ping(self, r):
|
||||
"""A noop callback ping for testing purposes.
|
||||
|
||||
|
||||
@@ -18,4 +18,4 @@ def noop(self, result, param, status_code):
|
||||
:rtype: bool
|
||||
"""
|
||||
logg.info('noop callback {} {} {}'.format(result, param, status_code))
|
||||
return True
|
||||
return result
|
||||
|
||||
@@ -18,7 +18,11 @@ logg = celery_app.log.get_default_logger()
|
||||
def redis(self, result, destination, status_code):
|
||||
(host, port, db, channel) = destination.split(':')
|
||||
r = redis_interface.Redis(host=host, port=port, db=db)
|
||||
s = json.dumps(result)
|
||||
data = {
|
||||
'root_id': self.request.root_id,
|
||||
'status': status_code,
|
||||
'result': result,
|
||||
}
|
||||
logg.debug('redis callback on host {} port {} db {} channel {}'.format(host, port, db, channel))
|
||||
r.publish(channel, s)
|
||||
r.publish(channel, json.dumps(data))
|
||||
r.close()
|
||||
|
||||
@@ -20,5 +20,10 @@ def tcp(self, result, destination, status_code):
|
||||
(host, port) = destination.split(':')
|
||||
logg.debug('tcp callback to {} {}'.format(host, port))
|
||||
s.connect((host, int(port)))
|
||||
s.send(json.dumps(result).encode('utf-8'))
|
||||
data = {
|
||||
'root_id': self.request.root_id,
|
||||
'status': status_code,
|
||||
'result': result,
|
||||
}
|
||||
s.send(json.dumps(data).encode('utf-8'))
|
||||
s.close()
|
||||
|
||||
@@ -1,6 +1,29 @@
|
||||
# standard imports
|
||||
import enum
|
||||
|
||||
|
||||
@enum.unique
|
||||
class StatusBits(enum.IntEnum):
|
||||
"""Individual bit flags that are combined to define the state and legacy of a queued transaction
|
||||
|
||||
"""
|
||||
QUEUED = 0x01 # transaction should be sent to network
|
||||
IN_NETWORK = 0x08 # transaction is in network
|
||||
|
||||
DEFERRED = 0x10 # an attempt to send the transaction to network has failed
|
||||
GAS_ISSUES = 0x20 # transaction is pending sender account gas funding
|
||||
|
||||
LOCAL_ERROR = 0x100 # errors that originate internally from the component
|
||||
NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...)
|
||||
NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT)
|
||||
UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur)
|
||||
|
||||
FINAL = 0x1000 # transaction processing has completed
|
||||
OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee
|
||||
MANUAL = 0x8000 # transaction processing has been manually overridden
|
||||
|
||||
|
||||
@enum.unique
|
||||
class StatusEnum(enum.IntEnum):
|
||||
"""
|
||||
|
||||
@@ -22,21 +45,27 @@ class StatusEnum(enum.IntEnum):
|
||||
* SUCCESS: THe transaction was successfully mined. (Block number will be set)
|
||||
|
||||
"""
|
||||
PENDING=-9
|
||||
SENDFAIL=-8
|
||||
RETRY=-7
|
||||
READYSEND=-6
|
||||
OBSOLETED=-2
|
||||
WAITFORGAS=-1
|
||||
SENT=0
|
||||
FUBAR=1
|
||||
CANCELLED=2
|
||||
OVERRIDDEN=3
|
||||
REJECTED=7
|
||||
REVERTED=8
|
||||
SUCCESS=9
|
||||
PENDING = 0
|
||||
|
||||
SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR
|
||||
RETRY = StatusBits.QUEUED | StatusBits.DEFERRED
|
||||
READYSEND = StatusBits.QUEUED
|
||||
|
||||
OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK
|
||||
|
||||
WAITFORGAS = StatusBits.GAS_ISSUES
|
||||
|
||||
SENT = StatusBits.IN_NETWORK
|
||||
FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR
|
||||
CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE
|
||||
OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL
|
||||
|
||||
REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL
|
||||
REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR
|
||||
SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL
|
||||
|
||||
|
||||
@enum.unique
|
||||
class LockEnum(enum.IntEnum):
|
||||
"""
|
||||
STICKY: When set, reset is not possible
|
||||
@@ -48,4 +77,78 @@ class LockEnum(enum.IntEnum):
|
||||
CREATE=2
|
||||
SEND=4
|
||||
QUEUE=8
|
||||
QUERY=16
|
||||
ALL=int(0xfffffffffffffffe)
|
||||
|
||||
|
||||
def status_str(v, bits_only=False):
|
||||
"""Render a human-readable string describing the status
|
||||
|
||||
If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned.
|
||||
|
||||
If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only.
|
||||
|
||||
:param v: Status bit field
|
||||
:type v: number
|
||||
:param bits_only: Only render individual bit labels.
|
||||
:type bits_only: bool
|
||||
:returns: Status string
|
||||
:rtype: str
|
||||
"""
|
||||
s = ''
|
||||
if not bits_only:
|
||||
try:
|
||||
s = StatusEnum(v).name
|
||||
return s
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
for i in range(16):
|
||||
b = (1 << i)
|
||||
if (b & 0xffff) & v:
|
||||
n = StatusBits(b).name
|
||||
if len(s) > 0:
|
||||
s += ','
|
||||
s += n
|
||||
if not bits_only:
|
||||
s += '*'
|
||||
return s
|
||||
|
||||
|
||||
def all_errors():
|
||||
"""Bit mask of all error states
|
||||
|
||||
:returns: Error flags
|
||||
:rtype: number
|
||||
"""
|
||||
return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
|
||||
|
||||
|
||||
def is_error_status(v):
|
||||
"""Check if value is an error state
|
||||
|
||||
:param v: Status bit field
|
||||
:type v: number
|
||||
:returns: True if error
|
||||
:rtype: bool
|
||||
"""
|
||||
return bool(v & all_errors())
|
||||
|
||||
|
||||
def dead():
|
||||
"""Bit mask defining whether a transaction is still likely to be processed on the network.
|
||||
|
||||
:returns: Bit mask
|
||||
:rtype: number
|
||||
"""
|
||||
return StatusBits.FINAL | StatusBits.OBSOLETE
|
||||
|
||||
|
||||
def is_alive(v):
|
||||
"""Check if transaction is still likely to be processed on the network.
|
||||
|
||||
The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set).
|
||||
|
||||
:returns:
|
||||
"""
|
||||
return bool(v & dead() == 0)
|
||||
|
||||
@@ -27,8 +27,8 @@ def upgrade():
|
||||
sa.Column('destination_token_address', sa.String(42), nullable=False),
|
||||
sa.Column('sender', sa.String(42), nullable=False),
|
||||
sa.Column('recipient', sa.String(42), nullable=False),
|
||||
sa.Column('from_value', sa.String(), nullable=False),
|
||||
sa.Column('to_value', sa.String(), nullable=True),
|
||||
sa.Column('from_value', sa.NUMERIC(), nullable=False),
|
||||
sa.Column('to_value', sa.NUMERIC(), nullable=True),
|
||||
sa.Column('block_number', sa.BIGINT(), nullable=True),
|
||||
sa.Column('tx_index', sa.Integer, nullable=True),
|
||||
)
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
"""Add chain syncer
|
||||
|
||||
Revision ID: ec40ac0974c1
|
||||
Revises: 6ac7a1dadc46
|
||||
Create Date: 2021-02-23 06:10:19.246304
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from chainsyncer.db.migrations.sqlalchemy import (
|
||||
chainsyncer_upgrade,
|
||||
chainsyncer_downgrade,
|
||||
)
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'ec40ac0974c1'
|
||||
down_revision = '6ac7a1dadc46'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
chainsyncer_upgrade(0, 0, 1)
|
||||
|
||||
|
||||
def downgrade():
|
||||
chainsyncer_downgrade(0, 0, 1)
|
||||
@@ -19,7 +19,6 @@ def upgrade():
|
||||
op.create_table(
|
||||
'tx_cache',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
# sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=True),
|
||||
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
sa.Column('date_updated', sa.DateTime, nullable=False),
|
||||
@@ -27,8 +26,8 @@ def upgrade():
|
||||
sa.Column('destination_token_address', sa.String(42), nullable=False),
|
||||
sa.Column('sender', sa.String(42), nullable=False),
|
||||
sa.Column('recipient', sa.String(42), nullable=False),
|
||||
sa.Column('from_value', sa.BIGINT(), nullable=False),
|
||||
sa.Column('to_value', sa.BIGINT(), nullable=True),
|
||||
sa.Column('from_value', sa.NUMERIC(), nullable=False),
|
||||
sa.Column('to_value', sa.NUMERIC(), nullable=True),
|
||||
sa.Column('block_number', sa.BIGINT(), nullable=True),
|
||||
sa.Column('tx_index', sa.Integer, nullable=True),
|
||||
)
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
"""Add chain syncer
|
||||
|
||||
Revision ID: ec40ac0974c1
|
||||
Revises: 6ac7a1dadc46
|
||||
Create Date: 2021-02-23 06:10:19.246304
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from chainsyncer.db.migrations.sqlalchemy import (
|
||||
chainsyncer_upgrade,
|
||||
chainsyncer_downgrade,
|
||||
)
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'ec40ac0974c1'
|
||||
down_revision = '6ac7a1dadc46'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
chainsyncer_upgrade(0, 0, 1)
|
||||
|
||||
|
||||
def downgrade():
|
||||
chainsyncer_downgrade(0, 0, 1)
|
||||
@@ -1,8 +1,18 @@
|
||||
# stanard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, Integer
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import (
|
||||
StaticPool,
|
||||
QueuePool,
|
||||
AssertionPool,
|
||||
)
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
Model = declarative_base(name='Model')
|
||||
|
||||
@@ -21,7 +31,11 @@ class SessionBase(Model):
|
||||
transactional = True
|
||||
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
|
||||
poolable = True
|
||||
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
|
||||
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
|
||||
procedural = True
|
||||
"""Whether the database backend supports stored procedures"""
|
||||
localsessions = {}
|
||||
"""Contains dictionary of sessions initiated by db model components"""
|
||||
|
||||
|
||||
@staticmethod
|
||||
@@ -40,7 +54,7 @@ class SessionBase(Model):
|
||||
|
||||
|
||||
@staticmethod
|
||||
def connect(dsn, debug=False):
|
||||
def connect(dsn, pool_size=8, debug=False):
|
||||
"""Create new database connection engine and connect to database backend.
|
||||
|
||||
:param dsn: DSN string defining connection.
|
||||
@@ -48,14 +62,28 @@ class SessionBase(Model):
|
||||
"""
|
||||
e = None
|
||||
if SessionBase.poolable:
|
||||
e = create_engine(
|
||||
dsn,
|
||||
max_overflow=50,
|
||||
pool_pre_ping=True,
|
||||
pool_size=20,
|
||||
pool_recycle=10,
|
||||
echo=debug,
|
||||
)
|
||||
poolclass = QueuePool
|
||||
if pool_size > 1:
|
||||
e = create_engine(
|
||||
dsn,
|
||||
max_overflow=pool_size*3,
|
||||
pool_pre_ping=True,
|
||||
pool_size=pool_size,
|
||||
pool_recycle=60,
|
||||
poolclass=poolclass,
|
||||
echo=debug,
|
||||
)
|
||||
else:
|
||||
if debug:
|
||||
poolclass = AssertionPool
|
||||
else:
|
||||
poolclass = StaticPool
|
||||
|
||||
e = create_engine(
|
||||
dsn,
|
||||
poolclass=poolclass,
|
||||
echo=debug,
|
||||
)
|
||||
else:
|
||||
e = create_engine(
|
||||
dsn,
|
||||
@@ -71,3 +99,23 @@ class SessionBase(Model):
|
||||
"""
|
||||
SessionBase.engine.dispose()
|
||||
SessionBase.engine = None
|
||||
|
||||
|
||||
@staticmethod
|
||||
def bind_session(session=None):
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
localsession_key = str(id(localsession))
|
||||
logg.debug('creating new session {}'.format(localsession_key))
|
||||
SessionBase.localsessions[localsession_key] = localsession
|
||||
return localsession
|
||||
|
||||
|
||||
@staticmethod
|
||||
def release_session(session=None):
|
||||
session_key = str(id(session))
|
||||
if SessionBase.localsessions.get(session_key) != None:
|
||||
logg.debug('destroying session {}'.format(session_key))
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
@@ -55,11 +55,9 @@ class Lock(SessionBase):
|
||||
:returns: New flag state of entry
|
||||
:rtype: number
|
||||
"""
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = localsession.query(Lock)
|
||||
q = session.query(Lock)
|
||||
#q = q.join(TxCache, isouter=True)
|
||||
q = q.filter(Lock.address==address)
|
||||
q = q.filter(Lock.blockchain==chain_str)
|
||||
@@ -71,7 +69,8 @@ class Lock(SessionBase):
|
||||
lock.address = address
|
||||
lock.blockchain = chain_str
|
||||
if tx_hash != None:
|
||||
q = localsession.query(Otx)
|
||||
session.flush()
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
otx = q.first()
|
||||
if otx != None:
|
||||
@@ -80,12 +79,11 @@ class Lock(SessionBase):
|
||||
lock.flags |= flags
|
||||
r = lock.flags
|
||||
|
||||
localsession.add(lock)
|
||||
localsession.commit()
|
||||
session.add(lock)
|
||||
session.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
if session == None:
|
||||
localsession.close()
|
||||
|
||||
return r
|
||||
|
||||
|
||||
@@ -110,11 +108,9 @@ class Lock(SessionBase):
|
||||
:returns: New flag state of entry
|
||||
:rtype: number
|
||||
"""
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = localsession.query(Lock)
|
||||
q = session.query(Lock)
|
||||
#q = q.join(TxCache, isouter=True)
|
||||
q = q.filter(Lock.address==address)
|
||||
q = q.filter(Lock.blockchain==chain_str)
|
||||
@@ -124,14 +120,13 @@ class Lock(SessionBase):
|
||||
if lock != None:
|
||||
lock.flags &= ~flags
|
||||
if lock.flags == 0:
|
||||
localsession.delete(lock)
|
||||
session.delete(lock)
|
||||
else:
|
||||
localsession.add(lock)
|
||||
session.add(lock)
|
||||
r = lock.flags
|
||||
localsession.commit()
|
||||
session.commit()
|
||||
|
||||
if session == None:
|
||||
localsession.close()
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return r
|
||||
|
||||
@@ -156,22 +151,20 @@ class Lock(SessionBase):
|
||||
:rtype: number
|
||||
"""
|
||||
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = localsession.query(Lock)
|
||||
q = session.query(Lock)
|
||||
#q = q.join(TxCache, isouter=True)
|
||||
q = q.filter(Lock.address==address)
|
||||
q = q.filter(Lock.blockchain==chain_str)
|
||||
q = q.filter(Lock.flags.op('&')(flags)==flags)
|
||||
lock = q.first()
|
||||
if session == None:
|
||||
localsession.close()
|
||||
|
||||
r = 0
|
||||
if lock != None:
|
||||
r = lock.flags & flags
|
||||
|
||||
SessionBase.release_session(session)
|
||||
return r
|
||||
|
||||
|
||||
|
||||
@@ -21,12 +21,9 @@ class Nonce(SessionBase):
|
||||
|
||||
@staticmethod
|
||||
def get(address, session=None):
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
|
||||
q = localsession.query(Nonce)
|
||||
q = session.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==address)
|
||||
nonce = q.first()
|
||||
|
||||
@@ -34,28 +31,29 @@ class Nonce(SessionBase):
|
||||
if nonce != None:
|
||||
nonce_value = nonce.nonce;
|
||||
|
||||
if session == None:
|
||||
localsession.close()
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return nonce_value
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get(conn, address):
|
||||
r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
|
||||
def __get(session, address):
|
||||
r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
|
||||
nonce = r.fetchone()
|
||||
session.flush()
|
||||
if nonce == None:
|
||||
return None
|
||||
return nonce[0]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __set(conn, address, nonce):
|
||||
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
|
||||
def __set(session, address, nonce):
|
||||
session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
|
||||
session.flush()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def next(address, initial_if_not_exists=0):
|
||||
def next(address, initial_if_not_exists=0, session=None):
|
||||
"""Generate next nonce for the given address.
|
||||
|
||||
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
|
||||
@@ -67,20 +65,32 @@ class Nonce(SessionBase):
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
conn = Nonce.engine.connect()
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
session.begin_nested()
|
||||
#conn = Nonce.engine.connect()
|
||||
if Nonce.transactional:
|
||||
conn.execute('BEGIN')
|
||||
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
|
||||
nonce = Nonce.__get(conn, address)
|
||||
#session.execute('BEGIN')
|
||||
session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
|
||||
session.flush()
|
||||
nonce = Nonce.__get(session, address)
|
||||
logg.debug('get nonce {} for address {}'.format(nonce, address))
|
||||
if nonce == None:
|
||||
nonce = initial_if_not_exists
|
||||
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
|
||||
session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
|
||||
session.flush()
|
||||
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
|
||||
Nonce.__set(conn, address, nonce+1)
|
||||
if Nonce.transactional:
|
||||
conn.execute('COMMIT')
|
||||
conn.close()
|
||||
Nonce.__set(session, address, nonce+1)
|
||||
#if Nonce.transactional:
|
||||
#session.execute('COMMIT')
|
||||
#session.execute('UNLOCK TABLE nonce')
|
||||
#conn.close()
|
||||
session.commit()
|
||||
session.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
return nonce
|
||||
|
||||
|
||||
|
||||
@@ -8,7 +8,12 @@ from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
|
||||
|
||||
# local imports
|
||||
from .base import SessionBase
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db.enum import (
|
||||
StatusEnum,
|
||||
StatusBits,
|
||||
status_str,
|
||||
is_error_status,
|
||||
)
|
||||
from cic_eth.db.error import TxStateChangeError
|
||||
#from cic_eth.eth.util import address_hex_from_signed_tx
|
||||
|
||||
@@ -54,21 +59,31 @@ class Otx(SessionBase):
|
||||
block = Column(Integer)
|
||||
|
||||
|
||||
def __set_status(self, status, session=None):
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
def __set_status(self, status, session):
|
||||
self.status |= status
|
||||
session.add(self)
|
||||
session.flush()
|
||||
|
||||
self.status = status
|
||||
localsession.add(self)
|
||||
localsession.flush()
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=localsession)
|
||||
def __reset_status(self, status, session):
|
||||
status_edit = ~status & self.status
|
||||
self.status &= status_edit
|
||||
session.add(self)
|
||||
session.flush()
|
||||
|
||||
|
||||
if session==None:
|
||||
localsession.commit()
|
||||
localsession.close()
|
||||
def __status_already_set(self, status):
|
||||
r = bool(self.status & status)
|
||||
if r:
|
||||
logg.warning('status bit {} already set on {}'.format(status.name, self.tx_hash))
|
||||
return r
|
||||
|
||||
|
||||
def __status_not_set(self, status):
|
||||
r = not(self.status & status)
|
||||
if r:
|
||||
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
|
||||
return r
|
||||
|
||||
|
||||
def set_block(self, block, session=None):
|
||||
@@ -102,9 +117,23 @@ class Otx(SessionBase):
|
||||
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
if self.status >= StatusEnum.SENT.value:
|
||||
raise TxStateChangeError('WAITFORGAS cannot succeed final state, had {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.WAITFORGAS, session)
|
||||
if self.__status_already_set(StatusBits.GAS_ISSUES):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.GAS_ISSUES, session)
|
||||
self.__reset_status(StatusBits.QUEUED | StatusBits.DEFERRED, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def fubar(self, session=None):
|
||||
@@ -112,28 +141,89 @@ class Otx(SessionBase):
|
||||
|
||||
Only manipulates object, does not transaction or commit to backend.
|
||||
"""
|
||||
self.__set_status(StatusEnum.FUBAR, session)
|
||||
if self.__status_already_set(StatusBits.UNKNOWN_ERROR):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if is_error_status(self.status):
|
||||
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def reject(self, session=None):
|
||||
"""Marks transaction as "rejected," which means the node rejected sending the transaction to the network. The nonce has not been spent, and the transaction should be replaced.
|
||||
|
||||
Only manipulates object, does not transaction or commit to backend.
|
||||
"""
|
||||
if self.status >= StatusEnum.SENT.value:
|
||||
raise TxStateChangeError('REJECTED cannot succeed SENT or final state, had {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.REJECTED, session)
|
||||
|
||||
if self.__status_already_set(StatusBits.NODE_ERROR):
|
||||
return
|
||||
|
||||
def override(self, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
|
||||
if is_error_status(self.status):
|
||||
raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def override(self, manual=False, session=None):
|
||||
"""Marks transaction as manually overridden.
|
||||
|
||||
Only manipulates object, does not transaction or commit to backend.
|
||||
"""
|
||||
if self.status >= StatusEnum.SENT.value:
|
||||
raise TxStateChangeError('OVERRIDDEN cannot succeed SENT or final state, had {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.OVERRIDDEN, session)
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.OBSOLETE:
|
||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.OBSOLETE, session)
|
||||
#if manual:
|
||||
# self.__set_status(StatusBits.MANUAL, session)
|
||||
self.__reset_status(StatusBits.QUEUED | StatusBits.IN_NETWORK, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def manual(self, session=None):
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.MANUAL, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
def retry(self, session=None):
|
||||
"""Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding.
|
||||
@@ -142,9 +232,23 @@ class Otx(SessionBase):
|
||||
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
if self.status != StatusEnum.SENT.value and self.status != StatusEnum.SENDFAIL.value:
|
||||
raise TxStateChangeError('RETRY must follow SENT or SENDFAIL, but had {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.RETRY, session)
|
||||
if self.__status_already_set(StatusBits.QUEUED):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0:
|
||||
raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.QUEUED, session)
|
||||
self.__reset_status(StatusBits.GAS_ISSUES, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def readysend(self, session=None):
|
||||
@@ -154,9 +258,23 @@ class Otx(SessionBase):
|
||||
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
if self.status != StatusEnum.PENDING.value and self.status != StatusEnum.WAITFORGAS.value:
|
||||
raise TxStateChangeError('READYSEND must follow PENDING or WAITFORGAS, but had {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.READYSEND, session)
|
||||
if self.__status_already_set(StatusBits.QUEUED):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if is_error_status(self.status):
|
||||
raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.QUEUED, session)
|
||||
self.__reset_status(StatusBits.GAS_ISSUES, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def sent(self, session=None):
|
||||
@@ -166,9 +284,21 @@ class Otx(SessionBase):
|
||||
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
if self.status > StatusEnum.SENT:
|
||||
raise TxStateChangeError('SENT after {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.SENT, session)
|
||||
if self.__status_already_set(StatusBits.IN_NETWORK):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.IN_NETWORK, session)
|
||||
self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def sendfail(self, session=None):
|
||||
@@ -178,9 +308,49 @@ class Otx(SessionBase):
|
||||
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
if self.status not in [StatusEnum.PENDING, StatusEnum.SENT, StatusEnum.WAITFORGAS]:
|
||||
raise TxStateChangeError('SENDFAIL must follow SENT or PENDING, but had {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.SENDFAIL, session)
|
||||
if self.__status_already_set(StatusBits.NODE_ERROR):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session)
|
||||
self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def dequeue(self, session=None):
|
||||
"""Marks that a process to execute send attempt is underway
|
||||
|
||||
Only manipulates object, does not transaction or commit to backend.
|
||||
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
if self.__status_not_set(StatusBits.QUEUED):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__reset_status(StatusBits.QUEUED, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
|
||||
def minefail(self, block, session=None):
|
||||
@@ -192,14 +362,25 @@ class Otx(SessionBase):
|
||||
:type block: number
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
if self.__status_already_set(StatusBits.NETWORK_ERROR):
|
||||
return
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if not self.status & StatusBits.IN_NETWORK:
|
||||
raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
if block != None:
|
||||
self.block = block
|
||||
if self.status != StatusEnum.SENT:
|
||||
logg.warning('REVERTED should follow SENT, but had {}'.format(StatusEnum(self.status).name))
|
||||
#if self.status != StatusEnum.PENDING and self.status != StatusEnum.OBSOLETED and self.status != StatusEnum.SENT:
|
||||
#if self.status > StatusEnum.SENT:
|
||||
# raise TxStateChangeError('REVERTED must follow OBSOLETED, PENDING or SENT, but had {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.REVERTED, session)
|
||||
|
||||
self.__set_status(StatusBits.NETWORK_ERROR | StatusBits.FINAL, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def cancel(self, confirmed=False, session=None):
|
||||
@@ -213,17 +394,23 @@ class Otx(SessionBase):
|
||||
:type confirmed: bool
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
|
||||
if confirmed:
|
||||
if self.status != StatusEnum.OBSOLETED:
|
||||
logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
|
||||
#raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
|
||||
if not self.status & StatusBits.OBSOLETE:
|
||||
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
|
||||
self.__set_status(StatusEnum.CANCELLED, session)
|
||||
elif self.status != StatusEnum.OBSOLETED:
|
||||
if self.status > StatusEnum.SENT:
|
||||
logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
|
||||
#raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
|
||||
else:
|
||||
self.__set_status(StatusEnum.OBSOLETED, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def success(self, block, session=None):
|
||||
"""Marks that transaction was successfully mined.
|
||||
@@ -235,16 +422,27 @@ class Otx(SessionBase):
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if not self.status & StatusBits.IN_NETWORK:
|
||||
raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
if is_error_status(self.status):
|
||||
raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status)))
|
||||
|
||||
if block != None:
|
||||
self.block = block
|
||||
if self.status != StatusEnum.SENT:
|
||||
logg.error('SUCCESS should follow SENT, but had {}'.format(StatusEnum(self.status).name))
|
||||
#raise TxStateChangeError('SUCCESS must follow SENT, but had {}'.format(StatusEnum(self.status).name))
|
||||
self.__set_status(StatusEnum.SUCCESS, session)
|
||||
|
||||
if self.tracing:
|
||||
self.__state_log(session=session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get(status=0, limit=4096, status_exact=True):
|
||||
def get(status=0, limit=4096, status_exact=True, session=None):
|
||||
"""Returns outgoing transaction lists by status.
|
||||
|
||||
Status may either be matched exactly, or be an upper bound of the integer value of the status enum.
|
||||
@@ -259,26 +457,32 @@ class Otx(SessionBase):
|
||||
:rtype: tuple, where first element is transaction hash
|
||||
"""
|
||||
e = None
|
||||
session = Otx.create_session()
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if status_exact:
|
||||
e = session.query(Otx.tx_hash).filter(Otx.status==status).order_by(Otx.date_created.asc()).limit(limit).all()
|
||||
else:
|
||||
e = session.query(Otx.tx_hash).filter(Otx.status<=status).order_by(Otx.date_created.asc()).limit(limit).all()
|
||||
session.close()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
return e
|
||||
|
||||
|
||||
@staticmethod
|
||||
def load(tx_hash):
|
||||
def load(tx_hash, session=None):
|
||||
"""Retrieves the outgoing transaction record by transaction hash.
|
||||
|
||||
:param tx_hash: Transaction hash
|
||||
:type tx_hash: str, 0x-hex
|
||||
"""
|
||||
session = Otx.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
session.close()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return q.first()
|
||||
|
||||
|
||||
@@ -450,6 +654,3 @@ class OtxSync(SessionBase):
|
||||
self.tx_height_session = 0
|
||||
self.block_height_backlog = 0
|
||||
self.tx_height_backlog = 0
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -24,9 +24,10 @@ class AccountRole(SessionBase):
|
||||
tag = Column(Text)
|
||||
address_hex = Column(String(42))
|
||||
|
||||
|
||||
|
||||
# TODO:
|
||||
@staticmethod
|
||||
def get_address(tag):
|
||||
def get_address(tag, session):
|
||||
"""Get Ethereum address matching the given tag
|
||||
|
||||
:param tag: Tag
|
||||
@@ -34,14 +35,26 @@ class AccountRole(SessionBase):
|
||||
:returns: Ethereum address, or zero-address if tag does not exist
|
||||
:rtype: str, 0x-hex
|
||||
"""
|
||||
role = AccountRole.get_role(tag)
|
||||
if role == None:
|
||||
return zero_address
|
||||
return role.address_hex
|
||||
if session == None:
|
||||
raise ValueError('nested bind session calls will not succeed as the first call to release_session in the stack will leave the db object detached further down the stack. We will need additional reference count.')
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
role = AccountRole.__get_role(tag, session)
|
||||
|
||||
r = zero_address
|
||||
if role != None:
|
||||
r = role.address_hex
|
||||
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return r
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_role(tag):
|
||||
def get_role(tag, session=None):
|
||||
"""Get AccountRole model object matching the given tag
|
||||
|
||||
:param tag: Tag
|
||||
@@ -49,20 +62,27 @@ class AccountRole(SessionBase):
|
||||
:returns: Role object, if found
|
||||
:rtype: cic_eth.db.models.role.AccountRole
|
||||
"""
|
||||
session = AccountRole.create_session()
|
||||
role = AccountRole.__get_role(session, tag)
|
||||
session.close()
|
||||
#return role.address_hex
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
role = AccountRole.__get_role(tag, session)
|
||||
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return role
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get_role(session, tag):
|
||||
return session.query(AccountRole).filter(AccountRole.tag==tag).first()
|
||||
def __get_role(tag, session):
|
||||
q = session.query(AccountRole)
|
||||
q = q.filter(AccountRole.tag==tag)
|
||||
r = q.first()
|
||||
return r
|
||||
|
||||
|
||||
@staticmethod
|
||||
def set(tag, address_hex):
|
||||
def set(tag, address_hex, session=None):
|
||||
"""Persist a tag to Ethereum address association.
|
||||
|
||||
This will silently overwrite the existing value.
|
||||
@@ -74,16 +94,18 @@ class AccountRole(SessionBase):
|
||||
:returns: Role object
|
||||
:rtype: cic_eth.db.models.role.AccountRole
|
||||
"""
|
||||
#session = AccountRole.create_session()
|
||||
#role = AccountRole.__get(session, tag)
|
||||
role = AccountRole.get_role(tag) #session, tag)
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
role = AccountRole.__get_role(tag, session)
|
||||
if role == None:
|
||||
role = AccountRole(tag)
|
||||
role.address_hex = address_hex
|
||||
#session.add(role)
|
||||
#session.commit()
|
||||
#session.close()
|
||||
return role #address_hex
|
||||
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return role
|
||||
|
||||
|
||||
@staticmethod
|
||||
@@ -95,20 +117,17 @@ class AccountRole(SessionBase):
|
||||
:returns: Role tag, or None if no match
|
||||
:rtype: str or None
|
||||
"""
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = localsession.query(AccountRole)
|
||||
q = session.query(AccountRole)
|
||||
q = q.filter(AccountRole.address_hex==address)
|
||||
role = q.first()
|
||||
tag = None
|
||||
if role != None:
|
||||
tag = role.tag
|
||||
|
||||
if session == None:
|
||||
localsession.close()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return tag
|
||||
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
import datetime
|
||||
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, String, Integer, DateTime, Enum, ForeignKey, Boolean
|
||||
from sqlalchemy import Column, String, Integer, DateTime, Enum, ForeignKey, Boolean, NUMERIC
|
||||
from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property
|
||||
#from sqlalchemy.orm import relationship, backref
|
||||
#from sqlalchemy.ext.declarative import declarative_base
|
||||
@@ -55,8 +55,8 @@ class TxCache(SessionBase):
|
||||
destination_token_address = Column(String(42))
|
||||
sender = Column(String(42))
|
||||
recipient = Column(String(42))
|
||||
from_value = Column(String())
|
||||
to_value = Column(String())
|
||||
from_value = Column(NUMERIC())
|
||||
to_value = Column(NUMERIC())
|
||||
block_number = Column(Integer())
|
||||
tx_index = Column(Integer())
|
||||
date_created = Column(DateTime, default=datetime.datetime.utcnow)
|
||||
@@ -64,16 +64,6 @@ class TxCache(SessionBase):
|
||||
date_checked = Column(DateTime, default=datetime.datetime.utcnow)
|
||||
|
||||
|
||||
def values(self):
|
||||
from_value_hex = bytes.fromhex(self.from_value)
|
||||
from_value = int.from_bytes(from_value_hex, 'big')
|
||||
|
||||
to_value_hex = bytes.fromhex(self.to_value)
|
||||
to_value = int.from_bytes(to_value_hex, 'big')
|
||||
|
||||
return (from_value, to_value)
|
||||
|
||||
|
||||
def check(self):
|
||||
"""Update the "checked" timestamp to current time.
|
||||
|
||||
@@ -95,63 +85,61 @@ class TxCache(SessionBase):
|
||||
:param tx_hash_new: tx hash to associate the copied entry with
|
||||
:type tx_hash_new: str, 0x-hex
|
||||
"""
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
|
||||
q = localsession.query(TxCache)
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(TxCache)
|
||||
q = q.join(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash_original)
|
||||
txc = q.first()
|
||||
|
||||
if txc == None:
|
||||
SessionBase.release_session(session)
|
||||
raise NotLocalTxError('original {}'.format(tx_hash_original))
|
||||
if txc.block_number != None:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
|
||||
|
||||
q = localsession.query(Otx)
|
||||
session.flush()
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash_new)
|
||||
otx = q.first()
|
||||
|
||||
if otx == None:
|
||||
SessionBase.release_session(session)
|
||||
raise NotLocalTxError('new {}'.format(tx_hash_new))
|
||||
|
||||
values = txc.values()
|
||||
txc_new = TxCache(
|
||||
otx.tx_hash,
|
||||
txc.sender,
|
||||
txc.recipient,
|
||||
txc.source_token_address,
|
||||
txc.destination_token_address,
|
||||
values[0],
|
||||
values[1],
|
||||
int(txc.from_value),
|
||||
int(txc.to_value),
|
||||
session=session,
|
||||
)
|
||||
localsession.add(txc_new)
|
||||
localsession.commit()
|
||||
session.add(txc_new)
|
||||
session.commit()
|
||||
|
||||
if session == None:
|
||||
localsession.close()
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None):
|
||||
session = SessionBase.create_session()
|
||||
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
tx = q.first()
|
||||
if tx == None:
|
||||
session.close()
|
||||
SessionBase.release_session(session)
|
||||
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
|
||||
self.otx_id = tx.id
|
||||
|
||||
# if tx == None:
|
||||
# session.close()
|
||||
# raise ValueError('tx hash {} (outgoing: {}) not found'.format(tx_hash, outgoing))
|
||||
# session.close()
|
||||
|
||||
self.sender = sender
|
||||
self.recipient = recipient
|
||||
self.source_token_address = source_token_address
|
||||
self.destination_token_address = destination_token_address
|
||||
self.from_value = num_serialize(from_value).hex()
|
||||
self.to_value = num_serialize(to_value).hex()
|
||||
self.from_value = from_value
|
||||
self.to_value = to_value
|
||||
self.block_number = block_number
|
||||
self.tx_index = tx_index
|
||||
# not automatically set in sqlite, it seems:
|
||||
@@ -159,4 +147,5 @@ class TxCache(SessionBase):
|
||||
self.date_updated = self.date_created
|
||||
self.date_checked = self.date_created
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from cic_registry import CICRegistry
|
||||
from cic_registry.chain import ChainSpec
|
||||
from erc20_single_shot_faucet import Faucet
|
||||
from cic_registry import zero_address
|
||||
from hexathon import strip_0x
|
||||
|
||||
# local import
|
||||
from cic_eth.eth import RpcClient
|
||||
@@ -21,6 +22,7 @@ from cic_eth.db.models.role import AccountRole
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.error import RoleMissingError
|
||||
from cic_eth.task import CriticalSQLAlchemyTask
|
||||
|
||||
#logg = logging.getLogger(__name__)
|
||||
logg = logging.getLogger()
|
||||
@@ -34,6 +36,7 @@ class AccountTxFactory(TxFactory):
|
||||
self,
|
||||
address,
|
||||
chain_spec,
|
||||
session=None,
|
||||
):
|
||||
"""Register an Ethereum account address with the on-chain account registry
|
||||
|
||||
@@ -56,7 +59,7 @@ class AccountTxFactory(TxFactory):
|
||||
'gas': gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'value': 0,
|
||||
})
|
||||
return tx_add
|
||||
@@ -66,6 +69,7 @@ class AccountTxFactory(TxFactory):
|
||||
self,
|
||||
address,
|
||||
chain_spec,
|
||||
session=None,
|
||||
):
|
||||
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
|
||||
|
||||
@@ -86,7 +90,7 @@ class AccountTxFactory(TxFactory):
|
||||
'gas': gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'value': 0,
|
||||
})
|
||||
return tx_add
|
||||
@@ -101,11 +105,12 @@ def unpack_register(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
f = data[2:10]
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
if f != '0a3b0a4f':
|
||||
raise ValueError('Invalid account index register data ({})'.format(f))
|
||||
|
||||
d = data[10:]
|
||||
d = data[8:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
}
|
||||
@@ -120,17 +125,19 @@ def unpack_gift(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
f = data[2:10]
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
if f != '63e4bff4':
|
||||
raise ValueError('Invalid account index register data ({})'.format(f))
|
||||
raise ValueError('Invalid gift data ({})'.format(f))
|
||||
|
||||
d = data[10:]
|
||||
d = data[8:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
}
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
# TODO: Separate out nonce initialization task
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def create(password, chain_str):
|
||||
"""Creates and stores a new ethereum account in the keystore.
|
||||
|
||||
@@ -149,9 +156,13 @@ def create(password, chain_str):
|
||||
logg.debug('created account {}'.format(a))
|
||||
|
||||
# Initialize nonce provider record for account
|
||||
# TODO: this can safely be set to zero, since we are randomly creating account
|
||||
n = c.w3.eth.getTransactionCount(a, 'pending')
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(Nonce).filter(Nonce.address_hex==a).first()
|
||||
q = session.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==a)
|
||||
o = q.first()
|
||||
session.flush()
|
||||
if o == None:
|
||||
o = Nonce()
|
||||
o.address_hex = a
|
||||
@@ -162,7 +173,7 @@ def create(password, chain_str):
|
||||
return a
|
||||
|
||||
|
||||
@celery_app.task(bind=True, throws=(RoleMissingError,))
|
||||
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask)
|
||||
def register(self, account_address, chain_str, writer_address=None):
|
||||
"""Creates a transaction to add the given address to the accounts index.
|
||||
|
||||
@@ -178,20 +189,22 @@ def register(self, account_address, chain_str, writer_address=None):
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
if writer_address == None:
|
||||
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER')
|
||||
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session)
|
||||
|
||||
if writer_address == zero_address:
|
||||
session.close()
|
||||
raise RoleMissingError(account_address)
|
||||
|
||||
|
||||
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
|
||||
c = RpcClient(chain_spec, holder_address=writer_address)
|
||||
txf = AccountTxFactory(writer_address, c)
|
||||
|
||||
tx_add = txf.add(account_address, chain_spec)
|
||||
tx_add = txf.add(account_address, chain_spec, session=session)
|
||||
session.close()
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data')
|
||||
|
||||
gas_budget = tx_add['gas'] * tx_add['gasPrice']
|
||||
@@ -209,7 +222,7 @@ def register(self, account_address, chain_str, writer_address=None):
|
||||
return account_address
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
def gift(self, account_address, chain_str):
|
||||
"""Creates a transaction to invoke the faucet contract for the given address.
|
||||
|
||||
@@ -304,6 +317,8 @@ def cache_gift_data(
|
||||
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
|
||||
tx_data = unpack_gift(tx['data'])
|
||||
|
||||
session = SessionBase.create_session()
|
||||
|
||||
tx_cache = TxCache(
|
||||
tx_hash_hex,
|
||||
tx['from'],
|
||||
@@ -312,9 +327,9 @@ def cache_gift_data(
|
||||
zero_address,
|
||||
0,
|
||||
0,
|
||||
session=session,
|
||||
)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
session.add(tx_cache)
|
||||
session.commit()
|
||||
cache_id = tx_cache.id
|
||||
@@ -322,7 +337,7 @@ def cache_gift_data(
|
||||
return (tx_hash_hex, cache_id)
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def cache_account_data(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
@@ -347,6 +362,7 @@ def cache_account_data(
|
||||
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
|
||||
tx_data = unpack_register(tx['data'])
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_cache = TxCache(
|
||||
tx_hash_hex,
|
||||
tx['from'],
|
||||
@@ -355,9 +371,8 @@ def cache_account_data(
|
||||
zero_address,
|
||||
0,
|
||||
0,
|
||||
session=session,
|
||||
)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
session.add(tx_cache)
|
||||
session.commit()
|
||||
cache_id = tx_cache.id
|
||||
|
||||
@@ -32,10 +32,10 @@ class TxFactory:
|
||||
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
|
||||
|
||||
|
||||
def next_nonce(self):
|
||||
def next_nonce(self, session=None):
|
||||
"""Returns the current cached nonce value, and increments it for next transaction.
|
||||
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
return self.nonce_oracle.next()
|
||||
return self.nonce_oracle.next(session=session)
|
||||
|
||||
@@ -52,7 +52,10 @@ class GasOracle():
|
||||
:returns: Etheerum account address
|
||||
:rtype: str, 0x-hex
|
||||
"""
|
||||
return AccountRole.get_address('GAS_GIFTER')
|
||||
session = SessionBase.create_session()
|
||||
a = AccountRole.get_address('GAS_GIFTER', session)
|
||||
session.close()
|
||||
return a
|
||||
|
||||
|
||||
def gas_price(self, category='safe'):
|
||||
|
||||
@@ -14,10 +14,10 @@ class NonceOracle():
|
||||
self.default_nonce = default_nonce
|
||||
|
||||
|
||||
def next(self):
|
||||
def next(self, session=None):
|
||||
"""Get next unique nonce.
|
||||
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
return Nonce.next(self.address, self.default_nonce)
|
||||
return Nonce.next(self.address, self.default_nonce, session=session)
|
||||
|
||||
@@ -33,7 +33,7 @@ def sign_tx(tx, chain_str):
|
||||
return (tx_hash_hex, tx_transfer_signed['raw'],)
|
||||
|
||||
|
||||
def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
|
||||
def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
|
||||
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
|
||||
|
||||
:param tx: Standard ethereum transaction data
|
||||
@@ -44,6 +44,7 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
|
||||
:type queue: str
|
||||
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
|
||||
:type cache_task: str
|
||||
:raises: sqlalchemy.exc.DatabaseError
|
||||
:returns: Tuple; Transaction hash, signed raw transaction data
|
||||
:rtype: tuple
|
||||
"""
|
||||
@@ -51,25 +52,13 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
|
||||
|
||||
logg.debug('adding queue tx {}'.format(tx_hash_hex))
|
||||
|
||||
# s = celery.signature(
|
||||
# 'cic_eth.queue.tx.create',
|
||||
# [
|
||||
# tx['nonce'],
|
||||
# tx['from'],
|
||||
# tx_hash_hex,
|
||||
# tx_signed_raw_hex,
|
||||
# chain_str,
|
||||
# ],
|
||||
# queue=queue,
|
||||
# )
|
||||
|
||||
# TODO: consider returning this as a signature that consequtive tasks can be linked to
|
||||
queue_create(
|
||||
tx['nonce'],
|
||||
tx['from'],
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
chain_str,
|
||||
session=session,
|
||||
)
|
||||
|
||||
if cache_task != None:
|
||||
|
||||
@@ -6,7 +6,10 @@ import celery
|
||||
import requests
|
||||
import web3
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry import zero_address
|
||||
from cic_registry.chain import ChainSpec
|
||||
from hexathon import strip_0x
|
||||
from chainlib.status import Status as TxStatus
|
||||
|
||||
# platform imports
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
@@ -17,6 +20,11 @@ from cic_eth.eth.task import sign_and_register_tx
|
||||
from cic_eth.eth.task import create_check_gas_and_send_task
|
||||
from cic_eth.eth.factory import TxFactory
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.ext.address import translate_address
|
||||
from cic_eth.task import (
|
||||
CriticalSQLAlchemyTask,
|
||||
CriticalWeb3Task,
|
||||
)
|
||||
|
||||
celery_app = celery.current_app
|
||||
logg = logging.getLogger()
|
||||
@@ -118,11 +126,12 @@ def unpack_transfer(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
f = data[2:10]
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
if f != contract_function_signatures['transfer']:
|
||||
raise ValueError('Invalid transfer data ({})'.format(f))
|
||||
|
||||
d = data[10:]
|
||||
d = data[8:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
'amount': int(d[64:], 16)
|
||||
@@ -138,11 +147,12 @@ def unpack_transferfrom(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
f = data[2:10]
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
if f != contract_function_signatures['transferfrom']:
|
||||
raise ValueError('Invalid transferFrom data ({})'.format(f))
|
||||
|
||||
d = data[10:]
|
||||
d = data[8:]
|
||||
return {
|
||||
'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
|
||||
@@ -159,18 +169,19 @@ def unpack_approve(data):
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
f = data[2:10]
|
||||
data = strip_0x(data)
|
||||
f = data[:8]
|
||||
if f != contract_function_signatures['approve']:
|
||||
raise ValueError('Invalid approval data ({})'.format(f))
|
||||
|
||||
d = data[10:]
|
||||
d = data[8:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
'amount': int(d[64:], 16)
|
||||
}
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalWeb3Task)
|
||||
def balance(tokens, holder_address, chain_str):
|
||||
"""Return token balances for a list of tokens for given address
|
||||
|
||||
@@ -185,7 +196,6 @@ def balance(tokens, holder_address, chain_str):
|
||||
"""
|
||||
#abi = ContractRegistry.abi('ERC20Token')
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
balances = []
|
||||
c = RpcClient(chain_spec)
|
||||
for t in tokens:
|
||||
#token = CICRegistry.get_address(t['address'])
|
||||
@@ -193,12 +203,12 @@ def balance(tokens, holder_address, chain_str):
|
||||
#o = c.w3.eth.contract(abi=abi, address=t['address'])
|
||||
o = CICRegistry.get_address(chain_spec, t['address']).contract
|
||||
b = o.functions.balanceOf(holder_address).call()
|
||||
logg.debug('balance {} for {}: {}'.format(t['address'], holder_address, b))
|
||||
balances.append(b)
|
||||
return b
|
||||
t['balance_network'] = b
|
||||
|
||||
return tokens
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||
"""Transfer ERC20 tokens between addresses
|
||||
|
||||
@@ -252,7 +262,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||
return tx_hash_hex
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
||||
"""Approve ERC20 transfer on behalf of holder address
|
||||
|
||||
@@ -306,7 +316,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
||||
return tx_hash_hex
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalWeb3Task)
|
||||
def resolve_tokens_by_symbol(token_symbols, chain_str):
|
||||
"""Returns contract addresses of an array of ERC20 token symbols
|
||||
|
||||
@@ -324,12 +334,12 @@ def resolve_tokens_by_symbol(token_symbols, chain_str):
|
||||
token = CICRegistry.get_token(chain_spec, token_symbol)
|
||||
tokens.append({
|
||||
'address': token.address(),
|
||||
#'converters': [],
|
||||
'converters': [],
|
||||
})
|
||||
return tokens
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def otx_cache_transfer(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
@@ -353,7 +363,7 @@ def otx_cache_transfer(
|
||||
return txc
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def cache_transfer_data(
|
||||
tx_hash_hex,
|
||||
tx,
|
||||
@@ -380,6 +390,7 @@ def cache_transfer_data(
|
||||
tx['to'],
|
||||
tx_data['amount'],
|
||||
tx_data['amount'],
|
||||
session=session,
|
||||
)
|
||||
session.add(tx_cache)
|
||||
session.commit()
|
||||
@@ -388,7 +399,7 @@ def cache_transfer_data(
|
||||
return (tx_hash_hex, cache_id)
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def otx_cache_approve(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
@@ -412,7 +423,7 @@ def otx_cache_approve(
|
||||
return txc
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def cache_approve_data(
|
||||
tx_hash_hex,
|
||||
tx,
|
||||
@@ -439,9 +450,76 @@ def cache_approve_data(
|
||||
tx['to'],
|
||||
tx_data['amount'],
|
||||
tx_data['amount'],
|
||||
session=session,
|
||||
)
|
||||
session.add(tx_cache)
|
||||
session.commit()
|
||||
cache_id = tx_cache.id
|
||||
session.close()
|
||||
return (tx_hash_hex, cache_id)
|
||||
|
||||
|
||||
class ExtendedTx:
|
||||
|
||||
_default_decimals = 6
|
||||
|
||||
def __init__(self, tx_hash, chain_spec):
|
||||
self._chain_spec = chain_spec
|
||||
self.chain = str(chain_spec)
|
||||
self.hash = tx_hash
|
||||
self.sender = None
|
||||
self.sender_label = None
|
||||
self.recipient = None
|
||||
self.recipient_label = None
|
||||
self.source_token_value = 0
|
||||
self.destination_token_value = 0
|
||||
self.source_token = zero_address
|
||||
self.destination_token = zero_address
|
||||
self.source_token_symbol = ''
|
||||
self.destination_token_symbol = ''
|
||||
self.source_token_decimals = ExtendedTx._default_decimals
|
||||
self.destination_token_decimals = ExtendedTx._default_decimals
|
||||
self.status = TxStatus.PENDING.name
|
||||
self.status_code = TxStatus.PENDING.value
|
||||
|
||||
|
||||
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
|
||||
self.sender = sender
|
||||
self.recipient = recipient
|
||||
if trusted_declarator_addresses != None:
|
||||
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain)
|
||||
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain)
|
||||
|
||||
|
||||
def set_tokens(self, source, source_value, destination=None, destination_value=None):
|
||||
if destination == None:
|
||||
destination = source
|
||||
if destination_value == None:
|
||||
destination_value = source_value
|
||||
st = CICRegistry.get_address(self._chain_spec, source)
|
||||
dt = CICRegistry.get_address(self._chain_spec, destination)
|
||||
self.source_token = source
|
||||
self.source_token_symbol = st.symbol()
|
||||
self.source_token_decimals = st.decimals()
|
||||
self.source_token_value = source_value
|
||||
self.destination_token = destination
|
||||
self.destination_token_symbol = dt.symbol()
|
||||
self.destination_token_decimals = dt.decimals()
|
||||
self.destination_token_value = destination_value
|
||||
|
||||
|
||||
def set_status(self, n):
|
||||
if n:
|
||||
self.status = TxStatus.ERROR.name
|
||||
else:
|
||||
self.status = TxStatus.SUCCESS.name
|
||||
self.status_code = n
|
||||
|
||||
|
||||
def to_dict(self):
|
||||
o = {}
|
||||
for attr in dir(self):
|
||||
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
|
||||
continue
|
||||
o[attr] = getattr(self, attr)
|
||||
return o
|
||||
|
||||
@@ -13,7 +13,10 @@ from .rpc import RpcClient
|
||||
from cic_eth.db import Otx, SessionBase
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.lock import Lock
|
||||
from cic_eth.db.enum import LockEnum
|
||||
from cic_eth.db.enum import (
|
||||
LockEnum,
|
||||
StatusBits,
|
||||
)
|
||||
from cic_eth.error import PermanentTxError
|
||||
from cic_eth.error import TemporaryTxError
|
||||
from cic_eth.error import NotLocalTxError
|
||||
@@ -29,6 +32,10 @@ from cic_eth.eth.nonce import NonceOracle
|
||||
from cic_eth.error import AlreadyFillingGasError
|
||||
from cic_eth.eth.util import tx_hex_string
|
||||
from cic_eth.admin.ctrl import lock_send
|
||||
from cic_eth.task import (
|
||||
CriticalSQLAlchemyTask,
|
||||
CriticalWeb3Task,
|
||||
)
|
||||
|
||||
celery_app = celery.current_app
|
||||
logg = logging.getLogger()
|
||||
@@ -37,7 +44,7 @@ MAX_NONCE_ATTEMPTS = 3
|
||||
|
||||
|
||||
# TODO this function is too long
|
||||
@celery_app.task(bind=True, throws=(OutOfGasError))
|
||||
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyTask)
|
||||
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
|
||||
"""Check the gas level of the sender address of a transaction.
|
||||
|
||||
@@ -75,7 +82,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
|
||||
|
||||
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
|
||||
balance = c.w3.eth.getBalance(address)
|
||||
logg.debug('check gas txs {}'.format(tx_hashes))
|
||||
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
|
||||
|
||||
if gas_required > balance:
|
||||
@@ -123,14 +129,13 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
|
||||
queue=queue,
|
||||
)
|
||||
ready_tasks.append(s)
|
||||
logg.debug('tasks {}'.format(ready_tasks))
|
||||
celery.group(ready_tasks)()
|
||||
|
||||
return txs
|
||||
|
||||
|
||||
# TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead.
|
||||
@celery_app.task(bind=True)
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
def hashes_to_txs(self, tx_hashes):
|
||||
"""Return a list of raw signed transactions from the local transaction queue corresponding to a list of transaction hashes.
|
||||
|
||||
@@ -140,7 +145,6 @@ def hashes_to_txs(self, tx_hashes):
|
||||
:returns: Signed raw transactions
|
||||
:rtype: list of str, 0x-hex
|
||||
"""
|
||||
#logg = celery_app.log.get_default_logger()
|
||||
if len(tx_hashes) == 0:
|
||||
raise ValueError('no transaction to send')
|
||||
|
||||
@@ -313,7 +317,8 @@ class ParityNodeHandler:
|
||||
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
|
||||
@celery_app.task(bind=True, base=CriticalWeb3Task)
|
||||
def send(self, txs, chain_str):
|
||||
"""Send transactions to the network.
|
||||
|
||||
@@ -341,7 +346,6 @@ def send(self, txs, chain_str):
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
|
||||
tx_hex = txs[0]
|
||||
logg.debug('send transaction {}'.format(tx_hex))
|
||||
|
||||
@@ -349,19 +353,9 @@ def send(self, txs, chain_str):
|
||||
tx_hash_hex = tx_hash.hex()
|
||||
|
||||
queue = self.request.delivery_info.get('routing_key', None)
|
||||
if queue == None:
|
||||
logg.debug('send tx {} has no queue', tx_hash)
|
||||
|
||||
c = RpcClient(chain_spec)
|
||||
r = None
|
||||
try:
|
||||
r = c.w3.eth.send_raw_transaction(tx_hex)
|
||||
except Exception as e:
|
||||
logg.debug('e {}'.format(e))
|
||||
raiser = ParityNodeHandler(chain_spec, queue)
|
||||
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
|
||||
raise e(m)
|
||||
|
||||
s_set_sent = celery.signature(
|
||||
'cic_eth.queue.tx.set_sent_status',
|
||||
[
|
||||
@@ -370,6 +364,14 @@ def send(self, txs, chain_str):
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
try:
|
||||
r = c.w3.eth.send_raw_transaction(tx_hex)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
raise(e)
|
||||
except Exception as e:
|
||||
raiser = ParityNodeHandler(chain_spec, queue)
|
||||
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
|
||||
raise e(m)
|
||||
s_set_sent.apply_async()
|
||||
|
||||
tx_tail = txs[1:]
|
||||
@@ -384,7 +386,8 @@ def send(self, txs, chain_str):
|
||||
return r.hex()
|
||||
|
||||
|
||||
@celery_app.task(bind=True, throws=(AlreadyFillingGasError))
|
||||
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
|
||||
def refill_gas(self, recipient_address, chain_str):
|
||||
"""Executes a native token transaction to fund the recipient's gas expenditures.
|
||||
|
||||
@@ -399,10 +402,11 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
|
||||
q = session.query(Otx.tx_hash)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(Otx.status<=0)
|
||||
q = q.filter(TxCache.from_value!='0x00')
|
||||
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
|
||||
q = q.filter(TxCache.from_value!=0)
|
||||
q = q.filter(TxCache.recipient==recipient_address)
|
||||
c = q.count()
|
||||
session.close()
|
||||
@@ -420,7 +424,7 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
gas_price = c.gas_price()
|
||||
gas_limit = c.default_gas_limit
|
||||
refill_amount = c.refill_amount()
|
||||
logg.debug('gas price {} nonce {}'.format(gas_price, nonce))
|
||||
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
|
||||
|
||||
# create and sign transaction
|
||||
tx_send_gas = {
|
||||
@@ -433,7 +437,6 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
'value': refill_amount,
|
||||
'data': '',
|
||||
}
|
||||
logg.debug('txsend_gas {}'.format(tx_send_gas))
|
||||
tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas)
|
||||
tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw'])
|
||||
tx_hash_hex = tx_hash.hex()
|
||||
@@ -484,18 +487,21 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
:rtype: str, 0x-hex
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
otx = session.query(Otx).filter(Otx.tx_hash==txold_hash_hex).first()
|
||||
if otx == None:
|
||||
session.close()
|
||||
raise NotLocalTxError(txold_hash_hex)
|
||||
|
||||
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==txold_hash_hex)
|
||||
otx = q.first()
|
||||
session.close()
|
||||
if otx == None:
|
||||
raise NotLocalTxError(txold_hash_hex)
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
c = RpcClient(chain_spec)
|
||||
|
||||
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
|
||||
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
|
||||
logg.debug('otx {} {}'.format(tx, otx.signed_tx))
|
||||
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
|
||||
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
|
||||
@@ -505,7 +511,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
else:
|
||||
gas_price = c.gas_price()
|
||||
if tx['gasPrice'] > gas_price:
|
||||
logg.warning('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice']))
|
||||
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice']))
|
||||
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
|
||||
tx['gasPrice'] += 1
|
||||
else:
|
||||
@@ -515,9 +521,6 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
else:
|
||||
tx['gasPrice'] = new_gas_price
|
||||
|
||||
logg.debug('after {}'.format(tx))
|
||||
|
||||
#(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx, chain_str, queue)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
|
||||
queue_create(
|
||||
tx['nonce'],
|
||||
@@ -537,10 +540,11 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
queue=queue,
|
||||
)
|
||||
s.apply_async()
|
||||
|
||||
return tx_hash_hex
|
||||
|
||||
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,))
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
|
||||
def sync_tx(self, tx_hash_hex, chain_str):
|
||||
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
@@ -599,7 +603,9 @@ def resume_tx(self, txpending_hash_hex, chain_str):
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
r = session.query(Otx.signed_tx).filter(Otx.tx_hash==txpending_hash_hex).first()
|
||||
q = session.query(Otx.signed_tx)
|
||||
q = q.filter(Otx.tx_hash==txpending_hash_hex)
|
||||
r = q.first()
|
||||
session.close()
|
||||
if r == None:
|
||||
raise NotLocalTxError(txpending_hash_hex)
|
||||
@@ -622,7 +628,7 @@ def resume_tx(self, txpending_hash_hex, chain_str):
|
||||
return txpending_hash_hex
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def otx_cache_parse_tx(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
@@ -649,7 +655,7 @@ def otx_cache_parse_tx(
|
||||
return txc
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def cache_gas_refill_data(
|
||||
tx_hash_hex,
|
||||
tx,
|
||||
|
||||
@@ -104,3 +104,5 @@ def tx_hex_string(tx_hex, chain_id):
|
||||
|
||||
tx_raw_bytes = bytes.fromhex(tx_hex)
|
||||
return tx_string(tx_raw_bytes, chain_id)
|
||||
|
||||
|
||||
|
||||
0
apps/cic-eth/cic_eth/ext/__init__.py
Normal file
0
apps/cic-eth/cic_eth/ext/__init__.py
Normal file
43
apps/cic-eth/cic_eth/ext/address.py
Normal file
43
apps/cic-eth/cic_eth/ext/address.py
Normal file
@@ -0,0 +1,43 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
from cic_registry.chain import ChainSpec
|
||||
from cic_registry import CICRegistry
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def translate_address(address, trusted_addresses, chain_spec):
|
||||
for trusted_address in trusted_addresses:
|
||||
o = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
|
||||
fn = o.function('declaration')
|
||||
declaration_hex = fn(trusted_address, address).call()
|
||||
declaration_bytes = declaration_hex[0].rstrip(b'\x00')
|
||||
declaration = None
|
||||
try:
|
||||
declaration = declaration_bytes.decode('utf-8', errors='strict')
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
return declaration
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
def translate_tx_addresses(tx, trusted_addresses, chain_str):
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
declaration = None
|
||||
if tx['sender_label'] == None:
|
||||
declaration = translate_address(tx['sender'], trusted_addresses, chain_spec)
|
||||
tx['sender_label'] = declaration
|
||||
|
||||
declaration = None
|
||||
if tx['recipient_label'] == None:
|
||||
declaration = translate_address(tx['recipient'], trusted_addresses, chain_spec)
|
||||
tx['recipient_label'] = declaration
|
||||
|
||||
return tx
|
||||
179
apps/cic-eth/cic_eth/ext/tx.py
Normal file
179
apps/cic-eth/cic_eth/ext/tx.py
Normal file
@@ -0,0 +1,179 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import math
|
||||
|
||||
# third-pary imports
|
||||
import web3
|
||||
import celery
|
||||
import moolb
|
||||
from cic_registry.chain import ChainSpec
|
||||
from cic_registry.registry import CICRegistry
|
||||
from hexathon import strip_0x
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.eth.token import unpack_transfer
|
||||
from cic_eth.queue.tx import get_tx_cache
|
||||
from cic_eth.queue.time import tx_times
|
||||
|
||||
celery_app = celery.current_app
|
||||
logg = logging.getLogger()
|
||||
|
||||
MAX_BLOCK_TX = 250
|
||||
|
||||
|
||||
# TODO: Make this method easier to read
|
||||
@celery_app.task()
|
||||
def list_tx_by_bloom(bloomspec, address, chain_str):
|
||||
"""Retrieve external transaction data matching the provided filter
|
||||
|
||||
The bloom filter representation with the following structure (the size of the filter will be inferred from the size of the provided filter data):
|
||||
{
|
||||
'alg': <str; hashing algorithm, currently only "sha256" is understood>,
|
||||
'high': <number; highest block number in matched set>,
|
||||
'low': <number; lowest block number in matched set>,
|
||||
'filter_rounds': <number; hashing rounds used to generate filter entry>,
|
||||
'block_filter': <hex; bloom filter data with block matches>,
|
||||
'blocktx_filter': <hex; bloom filter data with block+tx matches>,
|
||||
}
|
||||
|
||||
:param bloomspec: Bloom filter data
|
||||
:type bloomspec: dict (see description above)
|
||||
:param address: Recipient address to use in matching
|
||||
:type address: str, 0x-hex
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
:returns: dict of transaction data as dict, keyed by transaction hash
|
||||
:rtype: dict of dict
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
c = RpcClient(chain_spec)
|
||||
block_filter_data = bytes.fromhex(bloomspec['block_filter'])
|
||||
tx_filter_data = bytes.fromhex(bloomspec['blocktx_filter'])
|
||||
databitlen = len(block_filter_data)*8
|
||||
block_filter = moolb.Bloom(databitlen, bloomspec['filter_rounds'], default_data=block_filter_data)
|
||||
tx_filter = moolb.Bloom(databitlen, bloomspec['filter_rounds'], default_data=tx_filter_data)
|
||||
|
||||
txs = {}
|
||||
for block_height in range(bloomspec['low'], bloomspec['high']):
|
||||
block_height_bytes = block_height.to_bytes(4, 'big')
|
||||
if block_filter.check(block_height_bytes):
|
||||
logg.debug('filter matched block {}'.format(block_height))
|
||||
block = c.w3.eth.getBlock(block_height, True)
|
||||
|
||||
for tx_index in range(0, len(block.transactions)):
|
||||
composite = tx_index + block_height
|
||||
tx_index_bytes = composite.to_bytes(4, 'big')
|
||||
if tx_filter.check(tx_index_bytes):
|
||||
logg.debug('filter matched block {} tx {}'.format(block_height, tx_index))
|
||||
|
||||
try:
|
||||
tx = c.w3.eth.getTransactionByBlock(block_height, tx_index)
|
||||
except web3.exceptions.TransactionNotFound:
|
||||
logg.debug('false positive on block {} tx {}'.format(block_height, tx_index))
|
||||
continue
|
||||
tx_address = None
|
||||
tx_token_value = 0
|
||||
try:
|
||||
transfer_data = unpack_transfer(tx['data'])
|
||||
tx_address = transfer_data['to']
|
||||
tx_token_value = transfer_data['amount']
|
||||
except ValueError:
|
||||
logg.debug('not a transfer transaction, skipping {}'.format(tx))
|
||||
continue
|
||||
if address == tx_address:
|
||||
status = StatusEnum.SENT
|
||||
try:
|
||||
rcpt = c.w3.eth.getTransactionReceipt(tx.hash)
|
||||
if rcpt['status'] == 0:
|
||||
pending = StatusEnum.REVERTED
|
||||
else:
|
||||
pending = StatusEnum.SUCCESS
|
||||
except web3.exceptions.TransactionNotFound:
|
||||
pass
|
||||
|
||||
tx_hash_hex = tx['hash'].hex()
|
||||
|
||||
token = CICRegistry.get_address(chain_spec, tx['to'])
|
||||
token_symbol = token.symbol()
|
||||
token_decimals = token.decimals()
|
||||
times = tx_times(tx_hash_hex, chain_str)
|
||||
tx_r = {
|
||||
'hash': tx_hash_hex,
|
||||
'sender': tx['from'],
|
||||
'recipient': tx_address,
|
||||
'source_value': tx_token_value,
|
||||
'destination_value': tx_token_value,
|
||||
'source_token': tx['to'],
|
||||
'destination_token': tx['to'],
|
||||
'source_token_symbol': token_symbol,
|
||||
'destination_token_symbol': token_symbol,
|
||||
'source_token_decimals': token_decimals,
|
||||
'destination_token_decimals': token_decimals,
|
||||
'source_token_chain': chain_str,
|
||||
'destination_token_chain': chain_str,
|
||||
'nonce': tx['nonce'],
|
||||
}
|
||||
if times['queue'] != None:
|
||||
tx_r['date_created'] = times['queue']
|
||||
else:
|
||||
tx_r['date_created'] = times['network']
|
||||
txs[tx_hash_hex] = tx_r
|
||||
break
|
||||
return txs
|
||||
|
||||
|
||||
# TODO: Surely it must be possible to optimize this
|
||||
# TODO: DRY this with callback filter in cic_eth/runnable/manager
|
||||
# TODO: Remove redundant fields from end representation (timestamp, tx_hash)
|
||||
@celery_app.task()
|
||||
def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
|
||||
"""Merges transaction data from multiple sources and sorts them in chronological order.
|
||||
|
||||
:param tx_batches: Transaction data inputs
|
||||
:type tx_batches: lists of lists of transaction data
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
:param offset: Number of sorted results to skip (not yet implemented)
|
||||
:type offset: number
|
||||
:param limit: Maximum number of results to return (not yet implemented)
|
||||
:type limit: number
|
||||
:param newest_first: If True, returns results in reverse chronological order
|
||||
:type newest_first: bool
|
||||
:returns: Transactions
|
||||
:rtype: list
|
||||
"""
|
||||
txs_by_block = {}
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
if isinstance(tx_batches, dict):
|
||||
tx_batches = [tx_batches]
|
||||
|
||||
for b in tx_batches:
|
||||
for v in b.values():
|
||||
tx = None
|
||||
k = None
|
||||
try:
|
||||
hx = strip_0x(v)
|
||||
tx = unpack_signed_raw_tx(bytes.fromhex(hx), chain_spec.chain_id())
|
||||
txc = get_tx_cache(tx['hash'])
|
||||
txc['timestamp'] = int(txc['date_created'].timestamp())
|
||||
txc['hash'] = txc['tx_hash']
|
||||
tx = txc
|
||||
except TypeError:
|
||||
tx = v
|
||||
tx['timestamp'] = tx['date_created']
|
||||
k = '{}.{}.{}'.format(tx['timestamp'], tx['sender'], tx['nonce'])
|
||||
txs_by_block[k] = tx
|
||||
|
||||
txs = []
|
||||
ks = list(txs_by_block.keys())
|
||||
ks.sort()
|
||||
if newest_first:
|
||||
ks.reverse()
|
||||
for k in ks:
|
||||
txs.append(txs_by_block[k])
|
||||
return txs
|
||||
121
apps/cic-eth/cic_eth/queue/balance.py
Normal file
121
apps/cic-eth/cic_eth/queue/balance.py
Normal file
@@ -0,0 +1,121 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
from hexathon import strip_0x
|
||||
|
||||
# local imports
|
||||
from cic_registry.chain import ChainSpec
|
||||
from cic_eth.db import SessionBase
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.enum import (
|
||||
StatusBits,
|
||||
dead,
|
||||
)
|
||||
from cic_eth.task import CriticalSQLAlchemyTask
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def __balance_outgoing_compatible(token_address, holder_address, chain_str):
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(TxCache.from_value)
|
||||
q = q.join(Otx)
|
||||
q = q.filter(TxCache.sender==holder_address)
|
||||
status_compare = dead()
|
||||
q = q.filter(Otx.status.op('&')(status_compare)==0)
|
||||
q = q.filter(TxCache.source_token_address==token_address)
|
||||
delta = 0
|
||||
for r in q.all():
|
||||
delta += int(r[0])
|
||||
session.close()
|
||||
return delta
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def balance_outgoing(tokens, holder_address, chain_str):
|
||||
"""Retrieve accumulated value of unprocessed transactions sent from the given address.
|
||||
|
||||
:param tokens: list of token spec dicts with addresses to retrieve balances for
|
||||
:type tokens: list of str, 0x-hex
|
||||
:param holder_address: Sender address
|
||||
:type holder_address: str, 0x-hex
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
:returns: Tokens dicts with outgoing balance added
|
||||
:rtype: dict
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
for t in tokens:
|
||||
b = __balance_outgoing_compatible(t['address'], holder_address, chain_str)
|
||||
t['balance_outgoing'] = b
|
||||
|
||||
return tokens
|
||||
|
||||
|
||||
def __balance_incoming_compatible(token_address, receiver_address, chain_str):
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(TxCache.to_value)
|
||||
q = q.join(Otx)
|
||||
q = q.filter(TxCache.recipient==receiver_address)
|
||||
status_compare = dead()
|
||||
q = q.filter(Otx.status.op('&')(status_compare)==0)
|
||||
# TODO: this can change the result for the recipient if tx is later obsoleted and resubmission is delayed.
|
||||
q = q.filter(Otx.status.op('&')(StatusBits.IN_NETWORK)==StatusBits.IN_NETWORK)
|
||||
q = q.filter(TxCache.destination_token_address==token_address)
|
||||
delta = 0
|
||||
for r in q.all():
|
||||
delta += int(r[0])
|
||||
session.close()
|
||||
return delta
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def balance_incoming(tokens, receipient_address, chain_str):
|
||||
"""Retrieve accumulated value of unprocessed transactions to be received by the given address.
|
||||
|
||||
:param tokens: list of token spec dicts with addresses to retrieve balances for
|
||||
:type tokens: list of str, 0x-hex
|
||||
:param holder_address: Recipient address
|
||||
:type holder_address: str, 0x-hex
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
:returns: Tokens dicts with outgoing balance added
|
||||
:rtype: dict
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
for t in tokens:
|
||||
b = __balance_incoming_compatible(t['address'], receipient_address, chain_str)
|
||||
t['balance_incoming'] = b
|
||||
|
||||
return tokens
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
def assemble_balances(balances_collection):
|
||||
"""Combines token spec dicts with individual balances into a single token spec dict.
|
||||
|
||||
A "balance" means any field that is keyed with a string starting with "balance_"
|
||||
|
||||
:param balances_collection: Token spec dicts
|
||||
:type balances_collection: list of lists of dicts
|
||||
:returns: Single token spec dict per token with all balances
|
||||
:rtype: list of dicts
|
||||
"""
|
||||
tokens = {}
|
||||
for c in balances_collection:
|
||||
for b in c:
|
||||
address = b['address']
|
||||
if tokens.get(address) == None:
|
||||
tokens[address] = {
|
||||
'address': address,
|
||||
'converters': b['converters'],
|
||||
}
|
||||
for k in b.keys():
|
||||
if k[:8] == 'balance_':
|
||||
tokens[address][k] = b[k]
|
||||
return list(tokens.values())
|
||||
41
apps/cic-eth/cic_eth/queue/time.py
Normal file
41
apps/cic-eth/cic_eth/queue/time.py
Normal file
@@ -0,0 +1,41 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import web3
|
||||
import celery
|
||||
from cic_registry.chain import ChainSpec
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.error import NotLocalTxError
|
||||
from cic_eth.task import CriticalSQLAlchemyAndWeb3Task
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
# TODO: This method does not belong in the _queue_ module, it operates across queue and network
|
||||
@celery_app.task(base=CriticalSQLAlchemyAndWeb3Task)
|
||||
def tx_times(tx_hash, chain_str):
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
c = RpcClient(chain_spec)
|
||||
time_pair = {
|
||||
'network': None,
|
||||
'queue': None,
|
||||
}
|
||||
try:
|
||||
rcpt = c.w3.eth.getTransactionReceipt(tx_hash)
|
||||
block = c.w3.eth.getBlock(rcpt['blockHash'])
|
||||
logg.debug('rcpt {}'.format(block))
|
||||
time_pair['network'] = block['timestamp']
|
||||
except web3.exceptions.TransactionNotFound:
|
||||
pass
|
||||
|
||||
otx = Otx.load(tx_hash)
|
||||
if otx != None:
|
||||
time_pair['queue'] = int(otx['date_created'].timestamp())
|
||||
|
||||
return time_pair
|
||||
@@ -5,19 +5,28 @@ import datetime
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
from hexathon import strip_0x
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy import not_
|
||||
from sqlalchemy import tuple_
|
||||
from sqlalchemy import func
|
||||
|
||||
# local imports
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry.chain import ChainSpec
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db.models.otx import OtxStateLog
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.lock import Lock
|
||||
from cic_eth.db import SessionBase
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db.enum import LockEnum
|
||||
from cic_eth.db.enum import (
|
||||
StatusEnum,
|
||||
LockEnum,
|
||||
StatusBits,
|
||||
is_alive,
|
||||
dead,
|
||||
)
|
||||
from cic_eth.task import CriticalSQLAlchemyTask
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
|
||||
from cic_eth.error import NotLocalTxError
|
||||
from cic_eth.error import LockedError
|
||||
@@ -27,8 +36,7 @@ celery_app = celery.current_app
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True):
|
||||
def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True, session=None):
|
||||
"""Create a new transaction queue record.
|
||||
|
||||
:param nonce: Transaction nonce
|
||||
@@ -44,10 +52,10 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
|
||||
:returns: transaction hash
|
||||
:rtype: str, 0x-hash
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
lock = Lock.check_aggregate(chain_str, LockEnum.QUEUE, holder_address, session=session)
|
||||
if lock > 0:
|
||||
session.close()
|
||||
SessionBase.release_session(session)
|
||||
raise LockedError(lock)
|
||||
|
||||
o = Otx.add(
|
||||
@@ -70,19 +78,16 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
|
||||
|
||||
for otx in q.all():
|
||||
logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash))
|
||||
if otx.status == StatusEnum.SENT:
|
||||
otx.cancel(False, session=session)
|
||||
elif otx.status != StatusEnum.OBSOLETED:
|
||||
otx.override(session=session)
|
||||
otx.cancel(confirmed=False, session=session)
|
||||
|
||||
session.commit()
|
||||
session.close()
|
||||
SessionBase.release_session(session)
|
||||
logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash))
|
||||
return tx_hash
|
||||
|
||||
|
||||
# TODO: Replace set_* with single task for set status
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_sent_status(tx_hash, fail=False):
|
||||
"""Used to set the status after a send attempt
|
||||
|
||||
@@ -95,7 +100,9 @@ def set_sent_status(tx_hash, fail=False):
|
||||
:rtype: boolean
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
o = q.first()
|
||||
if o == None:
|
||||
logg.warning('not local tx, skipping {}'.format(tx_hash))
|
||||
session.close()
|
||||
@@ -112,7 +119,7 @@ def set_sent_status(tx_hash, fail=False):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_final_status(tx_hash, block=None, fail=False):
|
||||
"""Used to set the status of an incoming transaction result.
|
||||
|
||||
@@ -167,7 +174,8 @@ def set_final_status(tx_hash, block=None, fail=False):
|
||||
|
||||
return tx_hash
|
||||
|
||||
@celery_app.task()
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_cancel(tx_hash, manual=False):
|
||||
"""Used to set the status when a transaction is cancelled.
|
||||
|
||||
@@ -199,7 +207,7 @@ def set_cancel(tx_hash, manual=False):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_rejected(tx_hash):
|
||||
"""Used to set the status when the node rejects sending a transaction to network
|
||||
|
||||
@@ -225,7 +233,7 @@ def set_rejected(tx_hash):
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_fubar(tx_hash):
|
||||
"""Used to set the status when an unexpected error occurs.
|
||||
|
||||
@@ -250,7 +258,34 @@ def set_fubar(tx_hash):
|
||||
|
||||
return tx_hash
|
||||
|
||||
@celery_app.task()
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_manual(tx_hash):
|
||||
"""Used to set the status when queue is manually changed
|
||||
|
||||
Will set the state to MANUAL
|
||||
|
||||
:param tx_hash: Transaction hash of record to modify
|
||||
:type tx_hash: str, 0x-hex
|
||||
:raises NotLocalTxError: If transaction not found in queue.
|
||||
"""
|
||||
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
if o == None:
|
||||
session.close()
|
||||
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||
|
||||
session.flush()
|
||||
|
||||
o.manual(session=session)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_ready(tx_hash):
|
||||
"""Used to mark a transaction as ready to be sent to network
|
||||
|
||||
@@ -265,21 +300,36 @@ def set_ready(tx_hash):
|
||||
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||
session.flush()
|
||||
|
||||
if o.status == StatusEnum.WAITFORGAS or o.status == StatusEnum.PENDING:
|
||||
if o.status & StatusBits.GAS_ISSUES or o.status == StatusEnum.PENDING:
|
||||
o.readysend(session=session)
|
||||
else:
|
||||
o.retry(session=session)
|
||||
|
||||
logg.debug('ot otx otx {} {}'.format(tx_hash, o))
|
||||
|
||||
session.add(o)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
return tx_hash
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_dequeue(tx_hash):
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
if o == None:
|
||||
session.close()
|
||||
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||
|
||||
session.flush()
|
||||
|
||||
o.dequeue(session=session)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
return tx_hash
|
||||
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_waitforgas(tx_hash):
|
||||
"""Used to set the status when a transaction must be deferred due to gas refill
|
||||
|
||||
@@ -304,7 +354,8 @@ def set_waitforgas(tx_hash):
|
||||
|
||||
return tx_hash
|
||||
|
||||
@celery_app.task()
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def get_state_log(tx_hash):
|
||||
|
||||
logs = []
|
||||
@@ -323,7 +374,7 @@ def get_state_log(tx_hash):
|
||||
return logs
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def get_tx_cache(tx_hash):
|
||||
"""Returns an aggregate dictionary of outgoing transaction data and metadata
|
||||
|
||||
@@ -350,7 +401,6 @@ def get_tx_cache(tx_hash):
|
||||
|
||||
session.close()
|
||||
|
||||
values = txc.values()
|
||||
tx = {
|
||||
'tx_hash': otx.tx_hash,
|
||||
'signed_tx': otx.signed_tx,
|
||||
@@ -359,10 +409,12 @@ def get_tx_cache(tx_hash):
|
||||
'status_code': otx.status,
|
||||
'source_token': txc.source_token_address,
|
||||
'destination_token': txc.destination_token_address,
|
||||
'block_number': txc.block_number,
|
||||
'tx_index': txc.tx_index,
|
||||
'sender': txc.sender,
|
||||
'recipient': txc.recipient,
|
||||
'from_value': values[0],
|
||||
'to_value': values[1],
|
||||
'from_value': int(txc.from_value),
|
||||
'to_value': int(txc.to_value),
|
||||
'date_created': txc.date_created,
|
||||
'date_updated': txc.date_updated,
|
||||
'date_checked': txc.date_checked,
|
||||
@@ -371,7 +423,7 @@ def get_tx_cache(tx_hash):
|
||||
return tx
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def get_lock(address=None):
|
||||
"""Retrieve all active locks
|
||||
|
||||
@@ -409,7 +461,7 @@ def get_lock(address=None):
|
||||
return locks
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def get_tx(tx_hash):
|
||||
"""Retrieve a transaction queue record by transaction hash
|
||||
|
||||
@@ -420,8 +472,11 @@ def get_tx(tx_hash):
|
||||
:rtype: dict
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
tx = q.first()
|
||||
if tx == None:
|
||||
session.close()
|
||||
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||
|
||||
o = {
|
||||
@@ -435,7 +490,7 @@ def get_tx(tx_hash):
|
||||
return o
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def get_nonce_tx(nonce, sender, chain_id):
|
||||
"""Retrieve all transactions for address with specified nonce
|
||||
|
||||
@@ -466,7 +521,7 @@ def get_nonce_tx(nonce, sender, chain_id):
|
||||
|
||||
|
||||
# TODO: pass chain spec instead of chain id
|
||||
def get_paused_txs(status=None, sender=None, chain_id=0):
|
||||
def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
|
||||
"""Returns not finalized transactions that have been attempted sent without success.
|
||||
|
||||
:param status: If set, will return transactions with this local queue status only
|
||||
@@ -479,17 +534,19 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
|
||||
:returns: Transactions
|
||||
:rtype: dict, with transaction hash as key, signed raw transaction as value
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
q = session.query(Otx)
|
||||
|
||||
if status != None:
|
||||
if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
|
||||
#if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
|
||||
if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status):
|
||||
SessionBase.release_session(session)
|
||||
raise ValueError('not a valid paused tx value: {}'.format(status))
|
||||
q = q.filter(Otx.status==status)
|
||||
q = q.filter(Otx.status.op('&')(status.value)==status.value)
|
||||
q = q.join(TxCache)
|
||||
else:
|
||||
q = q.filter(Otx.status>StatusEnum.PENDING)
|
||||
q = q.filter(Otx.status<StatusEnum.SENT)
|
||||
q = q.filter(Otx.status>StatusEnum.PENDING.value)
|
||||
q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)>0))
|
||||
|
||||
if sender != None:
|
||||
q = q.filter(TxCache.sender==sender)
|
||||
@@ -503,12 +560,12 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
|
||||
#gas += tx['gas'] * tx['gasPrice']
|
||||
txs[r.tx_hash] = r.signed_tx
|
||||
|
||||
session.close()
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return txs
|
||||
|
||||
|
||||
def get_status_tx(status, before=None, limit=0):
|
||||
def get_status_tx(status, before=None, exact=False, limit=0, session=None):
|
||||
"""Retrieve transaction with a specific queue status.
|
||||
|
||||
:param status: Status to match transactions with
|
||||
@@ -521,23 +578,26 @@ def get_status_tx(status, before=None, limit=0):
|
||||
:rtype: list of cic_eth.db.models.otx.Otx
|
||||
"""
|
||||
txs = {}
|
||||
session = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
q = session.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(TxCache.date_updated<before)
|
||||
q = q.filter(Otx.status==status)
|
||||
if exact:
|
||||
q = q.filter(Otx.status==status.value)
|
||||
else:
|
||||
q = q.filter(Otx.status.op('&')(status.value)==status.value)
|
||||
i = 0
|
||||
for o in q.all():
|
||||
if limit > 0 and i == limit:
|
||||
break
|
||||
txs[o.tx_hash] = o.signed_tx
|
||||
i += 1
|
||||
session.close()
|
||||
SessionBase.release_session(session)
|
||||
return txs
|
||||
|
||||
|
||||
# TODO: move query to model
|
||||
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0):
|
||||
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None):
|
||||
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
|
||||
|
||||
Will omit addresses that have the LockEnum.SEND bit in Lock set.
|
||||
@@ -556,7 +616,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
||||
:returns: Transactions
|
||||
:rtype: dict, with transaction hash as key, signed raw transaction as value
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
q_outer = session.query(
|
||||
TxCache.sender,
|
||||
func.min(Otx.nonce).label('nonce'),
|
||||
@@ -565,9 +625,13 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
||||
q_outer = q_outer.join(Lock, isouter=True)
|
||||
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
|
||||
|
||||
if status >= StatusEnum.SENT:
|
||||
raise ValueError('not a valid non-final tx value: {}'.format(s))
|
||||
q_outer = q_outer.filter(Otx.status==status.value)
|
||||
if not is_alive(status):
|
||||
SessionBase.release_session(session)
|
||||
raise ValueError('not a valid non-final tx value: {}'.format(status))
|
||||
if status == StatusEnum.PENDING:
|
||||
q_outer = q_outer.filter(Otx.status==status.value)
|
||||
else:
|
||||
q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value)
|
||||
|
||||
if recipient != None:
|
||||
q_outer = q_outer.filter(TxCache.recipient==recipient)
|
||||
@@ -604,12 +668,12 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
||||
session.add(o)
|
||||
session.commit()
|
||||
|
||||
session.close()
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return txs
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None):
|
||||
"""Returns all local queue transactions for a given Ethereum address
|
||||
|
||||
@@ -627,6 +691,7 @@ def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None)
|
||||
"""
|
||||
if not as_sender and not as_recipient:
|
||||
raise ValueError('at least one of as_sender and as_recipient must be True')
|
||||
|
||||
txs = {}
|
||||
|
||||
session = SessionBase.create_session()
|
||||
@@ -642,10 +707,11 @@ def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None)
|
||||
|
||||
results = q.all()
|
||||
for r in results:
|
||||
if txs.get(r.tx_hash) != None:
|
||||
logg.debug('tx {} already recorded'.format(r.tx_hash))
|
||||
continue
|
||||
txs[r.tx_hash] = r.signed_tx
|
||||
session.close()
|
||||
|
||||
return txs
|
||||
|
||||
|
||||
|
||||
|
||||
86
apps/cic-eth/cic_eth/registry.py
Normal file
86
apps/cic-eth/cic_eth/registry.py
Normal file
@@ -0,0 +1,86 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import copy
|
||||
|
||||
# external imports
|
||||
from cic_registry import CICRegistry
|
||||
from eth_token_index import TokenUniqueSymbolIndex
|
||||
from eth_accounts_index import AccountRegistry
|
||||
from chainlib.chain import ChainSpec
|
||||
from cic_registry.chain import ChainRegistry
|
||||
from cic_registry.helper.declarator import DeclaratorOracleAdapter
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TokenOracle:
|
||||
|
||||
def __init__(self, conn, chain_spec, registry):
|
||||
self.tokens = []
|
||||
self.chain_spec = chain_spec
|
||||
self.registry = registry
|
||||
|
||||
token_registry_contract = CICRegistry.get_contract(chain_spec, 'TokenRegistry', 'Registry')
|
||||
self.getter = TokenUniqueSymbolIndex(conn, token_registry_contract.address())
|
||||
|
||||
|
||||
def get_tokens(self):
|
||||
token_count = self.getter.count()
|
||||
if token_count == len(self.tokens):
|
||||
return self.tokens
|
||||
|
||||
for i in range(len(self.tokens), token_count):
|
||||
token_address = self.getter.get_index(i)
|
||||
t = self.registry.get_address(self.chain_spec, token_address)
|
||||
token_symbol = t.symbol()
|
||||
self.tokens.append(t)
|
||||
|
||||
logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address))
|
||||
|
||||
return copy.copy(self.tokens)
|
||||
|
||||
|
||||
class AccountsOracle:
|
||||
|
||||
def __init__(self, conn, chain_spec, registry):
|
||||
self.accounts = []
|
||||
self.chain_spec = chain_spec
|
||||
self.registry = registry
|
||||
|
||||
accounts_registry_contract = CICRegistry.get_contract(chain_spec, 'AccountRegistry', 'Registry')
|
||||
self.getter = AccountRegistry(conn, accounts_registry_contract.address())
|
||||
|
||||
|
||||
def get_accounts(self):
|
||||
accounts_count = self.getter.count()
|
||||
if accounts_count == len(self.accounts):
|
||||
return self.accounts
|
||||
|
||||
for i in range(len(self.accounts), accounts_count):
|
||||
account = self.getter.get_index(i)
|
||||
self.accounts.append(account)
|
||||
logg.debug('adding account {}'.format(account))
|
||||
|
||||
return copy.copy(self.accounts)
|
||||
|
||||
|
||||
def init_registry(config, w3):
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
|
||||
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
|
||||
|
||||
chain_registry = ChainRegistry(chain_spec)
|
||||
CICRegistry.add_chain_registry(chain_registry, True)
|
||||
|
||||
declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator')
|
||||
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
|
||||
if trusted_addresses_src == None:
|
||||
raise ValueError('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
|
||||
trusted_addresses = trusted_addresses_src.split(',')
|
||||
for address in trusted_addresses:
|
||||
logg.info('using trusted address {}'.format(address))
|
||||
|
||||
oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
|
||||
chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
|
||||
|
||||
return CICRegistry
|
||||
@@ -76,8 +76,9 @@ def main():
|
||||
t = api.create_account(register=register)
|
||||
|
||||
ps.get_message()
|
||||
m = ps.get_message(timeout=args.timeout)
|
||||
print(json.loads(m['data']))
|
||||
o = ps.get_message(timeout=args.timeout)
|
||||
m = json.loads(o['data'])
|
||||
print(m['result'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -21,14 +21,21 @@ import cic_eth
|
||||
from cic_eth.eth import RpcClient
|
||||
from cic_eth.db import SessionBase
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db.enum import StatusBits
|
||||
from cic_eth.db.enum import LockEnum
|
||||
from cic_eth.db import dsn_from_config
|
||||
from cic_eth.queue.tx import get_upcoming_tx
|
||||
from cic_eth.queue.tx import (
|
||||
get_upcoming_tx,
|
||||
set_dequeue,
|
||||
)
|
||||
from cic_eth.admin.ctrl import lock_send
|
||||
from cic_eth.sync.error import LoopDone
|
||||
from cic_eth.eth.tx import send as task_tx_send
|
||||
from cic_eth.error import PermanentTxError
|
||||
from cic_eth.error import TemporaryTxError
|
||||
from cic_eth.error import (
|
||||
PermanentTxError,
|
||||
TemporaryTxError,
|
||||
NotLocalTxError,
|
||||
)
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx_hex
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
@@ -91,6 +98,8 @@ run = True
|
||||
|
||||
class DispatchSyncer:
|
||||
|
||||
yield_delay = 0.005
|
||||
|
||||
def __init__(self, chain_spec):
|
||||
self.chain_spec = chain_spec
|
||||
self.chain_id = chain_spec.chain_id()
|
||||
@@ -107,6 +116,11 @@ class DispatchSyncer:
|
||||
for k in txs.keys():
|
||||
tx_raw = txs[k]
|
||||
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
|
||||
|
||||
try:
|
||||
set_dequeue(tx['hash'])
|
||||
except NotLocalTxError as e:
|
||||
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
|
||||
|
||||
s_check = celery.signature(
|
||||
'cic_eth.admin.ctrl.check_lock',
|
||||
@@ -127,18 +141,22 @@ class DispatchSyncer:
|
||||
)
|
||||
s_check.link(s_send)
|
||||
t = s_check.apply_async()
|
||||
logg.info('processed {}'.format(k))
|
||||
|
||||
|
||||
def loop(self, w3, interval):
|
||||
while run:
|
||||
txs = {}
|
||||
typ = StatusEnum.READYSEND
|
||||
typ = StatusBits.QUEUED
|
||||
utxs = get_upcoming_tx(typ, chain_id=self.chain_id)
|
||||
for k in utxs.keys():
|
||||
txs[k] = utxs[k]
|
||||
self.process(w3, txs)
|
||||
|
||||
time.sleep(interval)
|
||||
if len(utxs) > 0:
|
||||
time.sleep(self.yield_delay)
|
||||
else:
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def main():
|
||||
@@ -0,0 +1,4 @@
|
||||
from .callback import CallbackFilter
|
||||
from .tx import TxFilter
|
||||
from .gas import GasFilter
|
||||
from .register import RegistrationFilter
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user