Compare commits

...

86 Commits

Author SHA1 Message Date
nolash
820accf553 Add tasks module to python package build 2021-03-12 18:47:35 +01:00
361f5f7f8b adding cic cache details 2021-03-12 09:27:27 -08:00
f6a7956fdf Update .cic-template.yml 2021-03-10 03:53:58 +00:00
562292bd01 Merge branch 'kaniko-builds' into 'master'
Update ci_templates/.cic-template.yml

See merge request grassrootseconomics/cic-internal-integration!61
2021-03-10 03:48:17 +00:00
3cdf7b9965 Update ci_templates/.cic-template.yml 2021-03-10 03:48:17 +00:00
16d88d389b move envlist to dockercontainer 2021-03-09 16:54:33 -08:00
15445b8d0f Merge branch 'cic-ussd-reqfix' into 'master'
cic types

See merge request grassrootseconomics/cic-internal-integration!59
2021-03-07 20:55:16 +00:00
bb90ceea0b cic types 2021-03-07 12:54:41 -08:00
8904e2abb1 Merge branch 'cic-ussd-req' into 'master'
revert the new requirements for ussd

See merge request grassrootseconomics/cic-internal-integration!58
2021-03-07 20:27:03 +00:00
94d3e61d0c revert the new requirements for ussd 2021-03-07 12:25:36 -08:00
nolash
fc08f3d17a bump cic base for cic ussd 2021-03-07 20:35:26 +01:00
Louis Holbrook
8da5219290 Merge branch 'lash/cli-rehabilitations' into 'master'
Rehabilitate CLI and API after nonce changes

See merge request grassrootseconomics/cic-internal-integration!57
2021-03-07 18:01:44 +00:00
Louis Holbrook
5f01135b04 Rehabilitate CLI and API after nonce changes 2021-03-07 18:01:44 +00:00
Louis Holbrook
543c6249b9 Merge branch 'lash/retry-on-signer-error' into 'master'
Retry on signer error

See merge request grassrootseconomics/cic-internal-integration!55
2021-03-07 13:51:59 +00:00
Louis Holbrook
db4f8f8955 Retry on signer error 2021-03-07 13:51:59 +00:00
Louis Holbrook
0c45e12ce1 Merge branch 'lash/ussd-cli' into 'master'
Add ussd cli

See merge request grassrootseconomics/cic-internal-integration!54
2021-03-06 22:27:40 +00:00
nolash
4f1014c5e1 Add ussd cli 2021-03-06 22:25:57 +01:00
Louis Holbrook
1bfc1434b4 Merge branch 'lash/cic-eth-upgrade' into 'master'
Upgrade cic-eth version

See merge request grassrootseconomics/cic-internal-integration!53
2021-03-06 20:00:10 +00:00
nolash
7b9cd2d4b8 Upgrade cic-eth version 2021-03-06 20:58:35 +01:00
Louis Holbrook
30febbd1e0 Merge branch 'lash/transfer-authorization' into 'master'
cic-eth: Make nonce separate task

See merge request grassrootseconomics/cic-internal-integration!52
2021-03-06 17:55:51 +00:00
Louis Holbrook
f0088f20de cic-eth: Make nonce separate task 2021-03-06 17:55:51 +00:00
Louis Holbrook
618769a0d2 Merge branch 'lash/cic-cache-biiig-num' into 'master'
cic-cache: Set value db fields in cic_cache to handle biiig numbers

See merge request grassrootseconomics/cic-internal-integration!51
2021-03-05 20:03:52 +00:00
nolash
e0a980363c Set value db fields in cic_cache to handle biiig numbers 2021-03-05 20:45:01 +01:00
9d36a5f92f Merge branch 'philip/dry-run-fixes' into 'master'
Get that "ussd_menu.json" out there.

See merge request grassrootseconomics/cic-internal-integration!50
2021-03-05 19:22:44 +00:00
2fe6f4125f Get that "ussd_menu.json" out there. 2021-03-05 22:09:22 +03:00
b5c50b348d Merge branch 'philip/dry-run-prep' into 'master'
Philip/dry run prep

See merge request grassrootseconomics/cic-internal-integration!49
2021-03-05 16:28:08 +00:00
ea336283dc Philip/dry run prep 2021-03-05 16:28:07 +00:00
aa99b16ad2 Merge branch 'lash/fix-tx-list' into 'master'
Make tx listing tasks work properly

See merge request grassrootseconomics/cic-internal-integration!45
2021-03-04 16:47:13 +00:00
Louis Holbrook
1e7fff0133 Minor refactors:
- Renames s_assemble to s_brief
-  Link s_local to s_brief
2021-03-04 16:47:13 +00:00
21c9d95c4b Merge branch 'lash/transfer-authorization' into 'master'
cic-eth: Introduce transfer authorization contract

See merge request grassrootseconomics/cic-internal-integration!47
2021-03-04 15:06:15 +00:00
Louis Holbrook
8240e58c0a cic-eth: Introduce transfer authorization contract 2021-03-04 15:06:14 +00:00
1e9bf6b4d3 Update .cic-template.yml 2021-03-04 14:39:32 +00:00
7e089e1083 Merge branch 'bvander/cic-meta-entrypoint-update' into 'master'
meta isn't using the compose command its in the the Dockerfile

See merge request grassrootseconomics/cic-internal-integration!42
2021-03-03 16:26:20 +00:00
Louis Holbrook
32627aad27 Merge branch 'lash/session-nonce-queue' into 'master'
Atomic nonce queue db sessions

See merge request grassrootseconomics/cic-internal-integration!48
2021-03-03 07:37:27 +00:00
Louis Holbrook
d9a8c672de Atomic nonce queue db sessions 2021-03-03 07:37:26 +00:00
Louis Holbrook
f92efa28f9 Merge branch 'bvander/bug/cic-trust-var-default' into 'master'
cic trust address bug

See merge request grassrootseconomics/cic-internal-integration!46
2021-03-02 18:02:00 +00:00
73729d19b0 Update apps/cic-cache/cic_cache/runnable/tracker.py 2021-03-02 17:33:13 +00:00
Louis Holbrook
ae1502a651 Merge branch 'lash/refactor-syncer' into 'master'
Syncer refactor

See merge request grassrootseconomics/cic-internal-integration!40
2021-03-01 20:15:17 +00:00
Louis Holbrook
5001113267 Syncer refactor 2021-03-01 20:15:17 +00:00
451079d004 Update ci_templates/.cic-template.yml 2021-03-01 14:35:53 +00:00
ba8a0b1953 Update ci_templates/.cic-template.yml 2021-03-01 14:29:08 +00:00
bbc948757f Update ci_templates/.cic-template.yml 2021-03-01 14:24:38 +00:00
ca8c1b1f27 Update ci_templates/.cic-template.yml 2021-03-01 14:22:46 +00:00
854753f120 Merge branch 'revert-f37fa1db' into 'master'
Revert "Update ci_templates/.cic-template.yml"

See merge request grassrootseconomics/cic-internal-integration!44
2021-02-28 18:32:41 +00:00
daadbc27e9 Revert "Update ci_templates/.cic-template.yml"
This reverts commit f37fa1dbcf
2021-02-28 18:23:01 +00:00
f37fa1dbcf Update ci_templates/.cic-template.yml 2021-02-28 17:54:13 +00:00
Spencer Ofwiti
ac264314c0 Merge branch 'spencer/cic-meta-exports' into 'master'
Add exports to interface with CICADA.

See merge request grassrootseconomics/cic-internal-integration!43
2021-02-25 09:58:43 +00:00
Spencer Ofwiti
84c1d11b48 Add exports to interface with CICADA. 2021-02-25 09:58:43 +00:00
f940d4a961 meta isn't using this. The entrypoin is in the Dockerfile 2021-02-24 06:47:34 -08:00
Spencer Ofwiti
6fe87652ce Merge branch 'spencer/update-cic-meta' into 'master'
Add changes from stand-alone cic-meta repo.

See merge request grassrootseconomics/cic-internal-integration!41
2021-02-24 12:29:41 +00:00
Spencer Ofwiti
8f65be16b1 Add changes from stand-alone cic-meta repo. 2021-02-24 15:14:17 +03:00
Louis Holbrook
34a48a6c6c Merge branch 'lash/fix-imports-again' into 'master'
Rehabilitate import scripts after leak fixes

See merge request grassrootseconomics/cic-internal-integration!37
2021-02-22 20:00:18 +00:00
Louis Holbrook
c68d9d8404 Rehabilitate import scripts after leak fixes 2021-02-22 20:00:18 +00:00
dd8d4b01e2 Merge branch 'bvander/cic-ussd-workdir' into 'master'
change workdir so ussd can run tasker

See merge request grassrootseconomics/cic-internal-integration!39
2021-02-22 18:07:33 +00:00
4d3cc85573 change workdir so it can run tasker 2021-02-22 09:51:57 -08:00
10717df7d1 Merge branch 'bvander/fixes-to-contract-migration-deps' into 'master'
run reset.sh outside docker and fix deps

See merge request grassrootseconomics/cic-internal-integration!38
2021-02-22 15:14:36 +00:00
3f2e0f5b2e run reset.sh outside docker and fix deps 2021-02-22 06:27:59 -08:00
Louis Holbrook
42ae8e5ed3 Merge branch 'lash/import-scripts-refactor' into 'master'
Refactor import scripts

See merge request grassrootseconomics/cic-internal-integration!28
2021-02-21 15:41:37 +00:00
Louis Holbrook
96b4ad4a72 Refactor import scripts 2021-02-21 15:41:37 +00:00
1274958493 rename docker latest tag 2021-02-20 04:12:18 +00:00
Louis Holbrook
b38ff7629d Merge branch 'lash/audit-postgres-sessions' into 'master'
Improve reuse of db sessions

See merge request grassrootseconomics/cic-internal-integration!36
2021-02-19 07:06:05 +00:00
Louis Holbrook
fe499de1e4 Fix last(?) leak in syncer
Signed-off-by: nolash <dev@holbrook.no>
2021-02-19 07:06:05 +00:00
2657ed58d3 Merge branch 'bvander/cic-cache-build-2' into 'master'
cic cache build

See merge request grassrootseconomics/cic-internal-integration!35
2021-02-18 05:04:30 +00:00
b26a14e8ca cic cache build 2021-02-18 05:04:30 +00:00
Louis Holbrook
725ef54cf5 Merge branch 'lash/replace-faucet' into 'master'
Replace faucet

See merge request grassrootseconomics/cic-internal-integration!32
2021-02-17 10:29:44 +00:00
Louis Holbrook
3ef39fb393 Replace faucet 2021-02-17 10:29:44 +00:00
Louis Holbrook
6c1382aac6 Merge branch 'lash/complex-balance' into 'master'
cic-eth: Complex balance

See merge request grassrootseconomics/cic-internal-integration!29
2021-02-17 10:04:21 +00:00
Louis Holbrook
ab7b5fbeb9 cic-eth: Complex balance 2021-02-17 10:04:21 +00:00
Louis Holbrook
c6bcda8832 Merge branch 'lash/cic-eth-cic-cache-txs' into 'master'
cic-eth: Integrate transaction list queries

See merge request grassrootseconomics/cic-internal-integration!30
2021-02-17 09:33:18 +00:00
Louis Holbrook
be3c59a780 cic-eth: Integrate transaction list queries 2021-02-17 09:33:18 +00:00
Louis Holbrook
fcde3d0bb2 Merge branch 'lash/cic-eth-revert-hex-values-db' into 'master'
cic-eth: Revert number as hex in tx-cache

See merge request grassrootseconomics/cic-internal-integration!31
2021-02-17 08:30:10 +00:00
Louis Holbrook
500f0c3a41 cic-eth: Revert number as hex in tx-cache 2021-02-17 08:30:10 +00:00
Louis Holbrook
fa83c50ab5 Merge branch 'lash/cic-eth-address-translator' into 'master'
cic-eth: Address translator task

See merge request grassrootseconomics/cic-internal-integration!34
2021-02-17 08:19:42 +00:00
Louis Holbrook
f154136dd3 cic-eth: Address translator task 2021-02-17 08:19:42 +00:00
Louis Holbrook
d493cebc7c Merge branch 'lash/admin-api-account-check' into 'master'
Add admin api account check method

See merge request grassrootseconomics/cic-internal-integration!27
2021-02-15 11:06:28 +00:00
Louis Holbrook
d798f78d7f Add admin api account check method 2021-02-15 11:06:28 +00:00
Louis Holbrook
d872e78e39 Merge branch 'lash/cic-eth-status-enum-bitfield' into 'master'
Translate StatusEnum to flags instead of number ranges

See merge request grassrootseconomics/cic-internal-integration!14
2021-02-13 17:01:48 +00:00
Louis Holbrook
14f29c4c32 Translate StatusEnum to flags instead of number ranges 2021-02-13 17:01:48 +00:00
d042ce0dcd Merge branch 'lash/correct-db-names' into 'master'
Correct db names and ports

See merge request grassrootseconomics/cic-internal-integration!25
2021-02-12 02:27:29 +00:00
Louis Holbrook
22586a04bf Correct db names and ports 2021-02-12 02:27:29 +00:00
449cd5830e keys in the wrong place 2021-02-11 07:33:13 -08:00
df3954fda5 Merge branch 'bvandernotify-build' into 'master'
cic-notify build

See merge request grassrootseconomics/cic-internal-integration!23
2021-02-11 15:24:34 +00:00
Louis Holbrook
6038d63501 Merge branch 'bvander/fix-mono-repo' into 'master'
Bvander/fix mono repo

See merge request grassrootseconomics/cic-internal-integration!21
2021-02-11 10:14:30 +00:00
d409306e5e Bvander/fix mono repo 2021-02-11 10:14:30 +00:00
Louis Holbrook
b1c3629549 Merge branch 'lash/rename-chainlib' into 'master'
Rename cic-tools to chainlib

See merge request grassrootseconomics/cic-internal-integration!24
2021-02-11 08:30:21 +00:00
ed063f0522 builds up 2021-02-10 20:44:15 -08:00
382 changed files with 19129 additions and 4447 deletions

View File

@@ -1,4 +0,0 @@
.git
.cache
.dot
**/doc

View File

@@ -5,6 +5,7 @@ include:
- local: 'apps/cic-ussd/.gitlab-ci.yml'
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
stages:
- build

6
.gitmodules vendored
View File

@@ -1,6 +0,0 @@
[submodule "apps/cic-cache"]
path = apps/cic-cache
url = git@gitlab.com:grassrootseconomics/cic-cache.git
[submodule "apps/cic-meta"]
path = apps/cic-meta
url = git@gitlab.com:grassrootseconomics/cic-meta.git

View File

@@ -2,6 +2,13 @@
## Getting started
## Make some keys
```
docker build -t bloxie . && docker run -v "$(pwd)/keys:/root/keys" --rm -it -t bloxie account new --chain /root/bloxberg.json --keys-path /root/keys
```
### Prepare the repo
This is stuff we need to put in makefile but for now...

View File

@@ -1,3 +1,6 @@
/validator/bloxbergData
/validator/bloxberg.log
keys/**/*
keys/*
!keys/Bloxberg
keys/Bloxberg/*
!keys/Bloxberg/UTC--2021-02-10T16-57-35Z--03512a62-5334-20cc-4e44-71156f33cff6

View File

@@ -17,7 +17,7 @@ COPY ./validator/bloxberg.json \
./validator/validator.toml \
/root/
COPY ./keys/ /root/keys/
COPY keys/ /root/keys/
# RUN chown -R parity:parity $HOME/ && \
# chmod -R 775 $HOME/ && \
@@ -25,4 +25,4 @@ COPY ./keys/ /root/keys/
# USER parity
ENTRYPOINT [ "parity" ]
CMD [ "--config", "/root/validator.toml", "--keys-path", "/root/keys/" ]
CMD [ "--config", "/root/validator.toml", "--keys-path", "/root/keys/", "--password", "/root/validator.pwd" ]

View File

@@ -4,7 +4,7 @@
The original bloxberg node config was kind of annoying so I am running it more like vanilla parity. This way you can pass command flags directly to parity.
## Make some keys
```
docker build -t bloxie . && docker run -v ${PWD}/keys:/root/keys --rm -it -t bloxie account new --chain /root/bloxberg.json --keys-path /root/keys
docker build -t bloxie . && docker run -v ${PWD}/keys:/root/keys --rm -it -t bloxie account new --chain /root/bloxberg.json --keys-path /root/keys --password /root/validator.pwd
```
## Enter the signer address and passwords in the config files

View File

@@ -0,0 +1 @@
{"id":"03512a62-5334-20cc-4e44-71156f33cff6","version":3,"crypto":{"cipher":"aes-128-ctr","cipherparams":{"iv":"dc388338c4d4e3203604aeb3d1c6bbfa"},"ciphertext":"8a945775b87089ce94537e011799f3abc1577c5dd1f3fbaebe1cd96dfdfc8b5a","kdf":"pbkdf2","kdfparams":{"c":10240,"dklen":32,"prf":"hmac-sha256","salt":"e8585836540caca01282381f5c1fe128e53b15b40f9d152fbc5a4f82a7967398"},"mac":"a7c7815e84a632ecf6d8f18c981bea73d50cd2e2a855a3e90477fc84ed14f906"},"address":"4f2a5902158c3969b245247f4154971d393301f2","name":"","meta":"{}"}

View File

@@ -7,7 +7,7 @@
"maximumUncleCount": 0,
"stepDuration": "5",
"validators" : {
"list": ["0x6bd4e51b3730576ddc4049654ef60ed7f7436cb5"]
"list": ["0x4f2a5902158c3969b245247f4154971d393301f2"]
}
}
}

View File

@@ -26,7 +26,7 @@ password = ["/root/validator.pwd"]
[mining]
#CHANGE ENGINE SIGNER TO VALIDATOR ADDRESS
engine_signer = "0x6bd4e51b3730576ddc4049654ef60ed7f7436cb5"
engine_signer = "0x4f2a5902158c3969b245247f4154971d393301f2"
reseal_on_txs = "none"
force_sealing = true
min_gas_price = 1000000

Submodule apps/cic-cache deleted from 06c5f0fb0d

View File

@@ -0,0 +1,2 @@
[bancor]
dir =

View File

@@ -0,0 +1,2 @@
[cic]
registry_address =

View File

@@ -0,0 +1,8 @@
[database]
NAME=cic-eth
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2

View File

@@ -0,0 +1,6 @@
[eth]
provider = ws://localhost:8545
#ttp_provider = http://localhost:8545
#provider = http://localhost:8545
gas_provider_address =
#chain_id =

View File

@@ -0,0 +1,2 @@
[bancor]
dir =

View File

@@ -0,0 +1,2 @@
[cic]
registry_address =

View File

@@ -0,0 +1,8 @@
[database]
NAME=cic-cache-test
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite

View File

@@ -0,0 +1,5 @@
[eth]
#ws_provider = ws://localhost:8546
#ttp_provider = http://localhost:8545
provider = http://localhost:8545
#chain_id =

View File

@@ -0,0 +1,5 @@
[report]
omit =
.venv/*
scripts/*
cic_cache/db/postgres/*

View File

@@ -0,0 +1,7 @@
set -a
CICTEST_DATABASE_ENGINE=postgresql
CICTEST_DATABASE_DRIVER=psycopg2
CICTEST_DATABASE_HOST=localhost
CICTEST_DATABASE_PORT=5432
CICTEST_DATABASE_NAME=cic-eth-test
set +a

8
apps/cic-cache/.gitignore vendored Normal file
View File

@@ -0,0 +1,8 @@
.envrc
.envrc_dev
.venv
__pycache__
*.pyc
_build
doc/**/*.png
doc/**/html

View File

@@ -0,0 +1,22 @@
.cic_cache_variables:
variables:
APP_NAME: cic-cache
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
.cic_cache_changes_target:
rules:
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-cache:
extends:
- .cic_cache_changes_target
- .py_build_merge_request
- .cic_cache_variables
build-push-cic-cache:
extends:
- .py_build_push
- .cic_cache_variables

13
apps/cic-cache/CHANGELOG Normal file
View File

@@ -0,0 +1,13 @@
- 0.1.2
* Revert to alembic migrations
- 0.1.1
* Add missing modules to setup
- 0.1.0
* Remove old APIs
* Add bloom filter output APIs for all txs and per-account txs
- 0.0.2
* UWSGI server endpoint example
* OpenAPI spec
* stored procedures, test fixture for database schema
- 0.0.1
* Add json translators of transaction_list and balances stored procedure queries

0
apps/cic-cache/README.md Normal file
View File

View File

@@ -0,0 +1 @@
from .cache import BloomCache

View File

@@ -0,0 +1,73 @@
"""API for cic-cache celery tasks
.. moduleauthor:: Louis Holbrook <dev@holbrook.no>
"""
# standard imports
import logging
# third-party imports
import celery
app = celery.current_app
logg = logging.getLogger(__name__)
class Api:
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, queue='cic-cache', callback_param=None, callback_task='cic_cache.callbacks.noop.noop', callback_queue=None):
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.info('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)
def list(self, offset, limit, address=None):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter',
[
0,
100,
address,
],
queue=None
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)
t = s.apply_async()
return t

View File

@@ -0,0 +1,89 @@
# standard imports
import logging
# third-party imports
import moolb
# local imports
from cic_cache.db import list_transactions_mined
from cic_cache.db import list_transactions_account_mined
logg = logging.getLogger()
class BloomCache:
def __init__(self, session):
self.session = session
@staticmethod
def __get_filter_size(n):
n = 8192 * 8
logg.warning('filter size hardcoded to {}'.format(n))
return n
def load_transactions(self, offset, limit):
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
For example, if the block number is 13 and the transaction index is 42, the input are:
block filter: 0x0d000000
block+tx filter: 0x0d0000002a0000000
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_mined(self.session, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
highest_block = -1
lowest_block = -1
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
def load_transactions_account(self, address, offset, limit):
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for.
:type address: str, 0x-hex
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_account_mined(self.session, address, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
highest_block = -1;
lowest_block = -1;
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)

View File

@@ -0,0 +1,35 @@
# standard imports
import logging
# local imports
from .list import list_transactions_mined
from .list import list_transactions_account_mined
from .list import add_transaction
logg = logging.getLogger()
def dsn_from_config(config):
scheme = config.get('DATABASE_ENGINE')
if config.get('DATABASE_DRIVER') != None:
scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
dsn = ''
if config.get('DATABASE_ENGINE') == 'sqlite':
dsn = '{}:///{}'.format(
scheme,
config.get('DATABASE_NAME'),
)
else:
dsn = '{}://{}:{}@{}:{}/{}'.format(
scheme,
config.get('DATABASE_USER'),
config.get('DATABASE_PASSWORD'),
config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'),
)
logg.debug('parsed dsn from config: {}'.format(dsn))
return dsn

View File

@@ -0,0 +1,79 @@
# standard imports
import logging
import datetime
# third-party imports
from cic_cache.db.models.base import SessionBase
logg = logging.getLogger()
def list_transactions_mined(
session,
offset,
limit,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
r = session.execute(s)
return r
def list_transactions_account_mined(
session,
address,
offset,
limit,
):
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for.
:type address: str, 0x-hex
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
r = session.execute(s)
return r
def add_transaction(
session, tx_hash,
block_number,
tx_index,
sender,
receiver,
source_token,
destination_token,
from_value,
to_value,
success,
timestamp,
):
date_block = datetime.datetime.fromtimestamp(timestamp)
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
tx_hash,
block_number,
tx_index,
sender,
receiver,
source_token,
destination_token,
from_value,
to_value,
success,
date_block,
)
session.execute(s)

View File

@@ -0,0 +1 @@
Generic single-database configuration.

View File

@@ -0,0 +1,86 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = .
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to ./versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat ./versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
#sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-cache
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1,77 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,52 @@
"""Base tables
Revision ID: 63b629f14a85
Revises:
Create Date: 2020-12-04 08:16:00.412189
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '63b629f14a85'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_registered', sa.DateTime, nullable=False, server_default=sa.func.current_timestamp()),
sa.Column('block_number', sa.Integer, nullable=False),
sa.Column('tx_index', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('source_token', sa.String(42), nullable=False),
sa.Column('destination_token', sa.String(42), nullable=False),
sa.Column('success', sa.Boolean, nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=False),
sa.Column('date_block', sa.DateTime, nullable=False),
)
op.create_table(
'tx_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tx', sa.String(66), nullable=False),
)
op.execute("INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');")
op.create_index('sender_token_idx', 'tx', ['sender', 'source_token'])
op.create_index('recipient_token_idx', 'tx', ['recipient', 'destination_token'])
def downgrade():
op.drop_index('recipient_token_idx')
op.drop_index('sender_token_idx')
op.drop_table('tx_sync')
op.drop_table('tx')

View File

@@ -0,0 +1,102 @@
# stanard imports
import logging
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
logg = logging.getLogger()
Model = declarative_base(name='Model')
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
__abstract__ = True
id = Column(Integer, primary_key=True)
engine = None
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
def create_session():
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def connect(dsn, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
:type dsn: str
"""
e = None
if SessionBase.poolable:
e = create_engine(
dsn,
max_overflow=50,
pool_pre_ping=True,
pool_size=20,
pool_recycle=10,
echo=debug,
)
else:
e = create_engine(
dsn,
echo=debug,
)
SessionBase._set_engine(e)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -0,0 +1,141 @@
# standard imports
import os
import re
import logging
import argparse
import json
import base64
# third-party imports
import confini
# local imports
from cic_cache import BloomCache
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations')
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config:\n{}'.format(config))
dsn = dsn_from_config(config)
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
# uwsgi application
def application(env, start_response):
headers = []
content = b''
session = SessionBase.create_session()
for handler in [
process_transactions_all_bloom,
process_transactions_account_bloom,
]:
r = handler(session, env)
if r != None:
(mime_type, content) = r
break
session.close()
headers.append(('Content-Length', str(len(content))),)
headers.append(('Access-Control-Allow-Origin', '*',));
if len(content) == 0:
headers.append(('Content-Type', 'text/plain, charset=UTF-8',))
start_response('404 Looked everywhere, sorry', headers)
else:
headers.append(('Content-Type', mime_type,))
start_response('200 OK', headers)
return [content]

View File

@@ -0,0 +1,98 @@
# standard imports
import logging
import os
import sys
import argparse
# third-party imports
import celery
import confini
# local imports
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
from cic_cache.tasks.tx import *
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-cache', help='queue name for worker tasks')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn)
# verify database connection with minimal sanity query
#session = SessionBase.create_session()
#session.execute('select version_num from alembic_version')
#session.close()
# set up celery
current_app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
current_app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
current_app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
current_app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
current_app.conf.update({
'result_backend': result,
})
def main():
argv = ['worker']
if args.vv:
argv.append('--loglevel=DEBUG')
elif args.v:
argv.append('--loglevel=INFO')
argv.append('-Q')
argv.append(args.q)
argv.append('-n')
argv.append(args.q)
current_app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,339 @@
# standard imports
import sys
import os
import argparse
import logging
import time
import enum
import re
# third-party imports
import confini
from cic_registry import CICRegistry
from cic_registry.chain import (
ChainRegistry,
ChainSpec,
)
#from cic_registry.bancor import BancorRegistryClient
from cic_registry.token import Token
from cic_registry.error import (
UnknownContractError,
UnknownDeclarationError,
)
from cic_registry.declaration import to_token_declaration
from web3.exceptions import BlockNotFound, TransactionNotFound
from websockets.exceptions import ConnectionClosedError
from requests.exceptions import ConnectionError
import web3
from web3 import HTTPProvider, WebsocketProvider
# local imports
from cic_cache import db
from cic_cache.db.models.base import SessionBase
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
log_topics = {
'transfer': '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
'convert': '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095',
'accountregistry_add': '0a3b0a4f4c6e53dce3dbcad5614cb2ba3a0fa7326d03c5d64b4fa2d565492737',
}
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--trust-address', default=[], type=str, dest='trust_address', action='append', help='Set address as trust')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_TRUST_ADDRESS': ",".join(getattr(args, 'trust_address', [])),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
# connect to database
dsn = db.dsn_from_config(config)
SessionBase.connect(dsn)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
class RunStateEnum(enum.IntEnum):
INIT = 0
RUN = 1
TERMINATE = 9
def rubberstamp(src):
return True
class Tracker:
def __init__(self, chain_spec, trusts=[]):
self.block_height = 0
self.tx_height = 0
self.state = RunStateEnum.INIT
self.declarator_cache = {}
self.convert_enabled = False
self.trusts = trusts
self.chain_spec = chain_spec
self.declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
def __process_tx(self, w3, session, t, r, l, b):
token_value = int(l.data, 16)
token_sender = l.topics[1][-20:].hex()
token_recipient = l.topics[2][-20:].hex()
#ts = ContractRegistry.get_address(t.address)
ts = CICRegistry.get_address(self.chain_spec, t.address())
logg.info('add token transfer {} value {} from {} to {}'.format(
ts.symbol(),
token_value,
token_sender,
token_recipient,
)
)
db.add_transaction(
session,
r.transactionHash.hex(),
r.blockNumber,
r.transactionIndex,
w3.toChecksumAddress(token_sender),
w3.toChecksumAddress(token_recipient),
t.address(),
t.address(),
token_value,
token_value,
r.status == 1,
b.timestamp,
)
session.flush()
# TODO: simplify/ split up and/or comment, function is too long
def __process_convert(self, w3, session, t, r, l, b):
logg.warning('conversions are deactivated')
return
# token_source = l.topics[2][-20:].hex()
# token_source = w3.toChecksumAddress(token_source)
# token_destination = l.topics[3][-20:].hex()
# token_destination = w3.toChecksumAddress(token_destination)
# data_noox = l.data[2:]
# d = data_noox[:64]
# token_from_value = int(d, 16)
# d = data_noox[64:128]
# token_to_value = int(d, 16)
# token_trader = '0x' + data_noox[192-40:]
#
# #ts = ContractRegistry.get_address(token_source)
# ts = CICRegistry.get_address(CICRegistry.bancor_chain_spec, t.address())
# #if ts == None:
# # ts = ContractRegistry.reserves[token_source]
# td = ContractRegistry.get_address(token_destination)
# #if td == None:
# # td = ContractRegistry.reserves[token_source]
# logg.info('add token convert {} -> {} value {} -> {} trader {}'.format(
# ts.symbol(),
# td.symbol(),
# token_from_value,
# token_to_value,
# token_trader,
# )
# )
#
# db.add_transaction(
# session,
# r.transactionHash.hex(),
# r.blockNumber,
# r.transactionIndex,
# w3.toChecksumAddress(token_trader),
# w3.toChecksumAddress(token_trader),
# token_source,
# token_destination,
# r.status == 1,
# b.timestamp,
# )
# session.flush()
def check_token(self, address):
t = None
try:
t = CICRegistry.get_address(CICRegistry.default_chain_spec, address)
return t
except UnknownContractError:
logg.debug('contract {} not in registry'.format(address))
# If nothing was returned, we look up the token in the declarator
for trust in self.trusts:
logg.debug('look up declaration for contract {} with trust {}'.format(address, trust))
fn = self.declarator.function('declaration')
# TODO: cache trust in LRUcache
declaration_array = fn(trust, address).call()
try:
declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp])
logg.debug('found declaration for token {} from trust address {}'.format(address, trust))
except UnknownDeclarationError:
continue
try:
c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address)
t = CICRegistry.add_token(self.chain_spec, c)
break
except ValueError:
logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address))
return t
# TODO use input data instead of logs
def process(self, w3, session, block):
#self.refresh_registry(w3)
tx_count = w3.eth.getBlockTransactionCount(block.hash)
b = w3.eth.getBlock(block.hash)
for i in range(self.tx_height, tx_count):
tx = w3.eth.getTransactionByBlock(block.hash, i)
if tx.to == None:
logg.debug('block {} tx {} is contract creation tx, skipping'.format(block.number, i))
continue
if len(w3.eth.getCode(tx.to)) == 0:
logg.debug('block {} tx {} not a contract tx, skipping'.format(block.number, i))
continue
t = self.check_token(tx.to)
if t != None and isinstance(t, Token):
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['transfer']:
self.__process_tx(w3, session, t, r, l, b)
# TODO: cache contracts in LRUcache
elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address:
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['convert']:
self.__process_convert(w3, session, t, r, l, b)
session.execute("UPDATE tx_sync SET tx = '{}'".format(tx.hash.hex()))
session.commit()
self.tx_height += 1
def __get_next_retry(self, backoff=False):
return 1
def loop(self):
logg.info('starting at block {} tx index {}'.format(self.block_height, self.tx_height))
self.state = RunStateEnum.RUN
while self.state == RunStateEnum.RUN:
(provider, w3) = web3_constructor()
session = SessionBase.create_session()
try:
block = w3.eth.getBlock(self.block_height)
self.process(w3, session, block)
self.block_height += 1
self.tx_height = 0
except BlockNotFound as e:
logg.debug('no block {} yet, zZzZ...'.format(self.block_height))
time.sleep(self.__get_next_retry())
except ConnectionClosedError as e:
logg.info('connection gone, retrying')
time.sleep(self.__get_next_retry(True))
except OSError as e:
logg.error('cannot connect {}'.format(e))
time.sleep(self.__get_next_retry(True))
except Exception as e:
session.close()
raise(e)
session.close()
def load(self, w3):
session = SessionBase.create_session()
r = session.execute('SELECT tx FROM tx_sync').first()
if r != None:
if r[0] == '0x{0:0{1}X}'.format(0, 64):
logg.debug('last tx was zero-address, starting from scratch')
return
t = w3.eth.getTransaction(r[0])
self.block_height = t.blockNumber
self.tx_height = t.transactionIndex+1
c = w3.eth.getBlockTransactionCount(t.blockHash.hex())
logg.debug('last tx processed {} index {} (max index {})'.format(t.blockNumber, t.transactionIndex, c-1))
if c == self.tx_height:
self.block_height += 1
self.tx_height = 0
session.close()
(provider, w3) = web3_constructor()
trust = config.get('CIC_TRUST_ADDRESS', "").split(",")
chain_spec = args.i
try:
w3.eth.chainId
except Exception as e:
logg.exception(e)
sys.stderr.write('cannot connect to evm node\n')
sys.exit(1)
def main():
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry)
t = Tracker(chain_spec, trust)
t.load(w3)
t.loop()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,38 @@
# third-party imports
import celery
# local imports
from cic_cache.cache import BloomCache
from cic_cache.db.models.base import SessionBase
celery_app = celery.current_app
@celery_app.task(bind=True)
def tx_filter(self, offset, limit, address=None, encoding='hex'):
queue = self.request.delivery_info.get('routing_key')
session = SessionBase.create_session()
c = BloomCache(session)
b = None
if address == None:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
else:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
session.close()
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': bloom_filter_block.hex(),
'blocktx_filter': bloom_filter_tx.hex(),
'filter_rounds': 3,
}
return o

View File

@@ -0,0 +1,18 @@
import os
import semver
version = (
0,
2,
0,
'alpha.1',
)
version_object = semver.VersionInfo(
major=version[0],
minor=version[1],
patch=version[2],
prerelease=version[3],
)
version_string = str(version_object)

View File

@@ -0,0 +1,2 @@
[bancor]
dir =

View File

@@ -0,0 +1,3 @@
[celery]
broker_url = redis:///
result_url = redis:///

View File

@@ -0,0 +1,4 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -0,0 +1,9 @@
[database]
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=

View File

@@ -0,0 +1,3 @@
[bancor]
registry_address =
dir = /usr/local/share/bancor

View File

@@ -0,0 +1,3 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379

View File

@@ -0,0 +1,9 @@
[database]
NAME=cic_cache
USER=grassroots
PASSWORD=
HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=1

View File

@@ -0,0 +1,3 @@
[eth]
provider = ws://localhost:63546
chain_id = 8996

View File

@@ -0,0 +1,7 @@
[eth]
provider = ws://localhost:8545
#ttp_provider = http://localhost:8545
#provider = http://localhost:8545
gas_provider_address =
#chain_id =
abi_dir = /usr/local/share/cic/solidity/abi

View File

@@ -0,0 +1,2 @@
[bancor]
dir =

View File

@@ -0,0 +1,2 @@
[cic]
registry_address =

View File

@@ -0,0 +1,9 @@
[database]
NAME=cic-cache-test
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=

View File

@@ -0,0 +1,5 @@
[eth]
#ws_provider = ws://localhost:8546
#ttp_provider = http://localhost:8545
provider = http://localhost:8545
#chain_id =

View File

@@ -0,0 +1,5 @@
CREATE DATABASE "cic-cache";
CREATE DATABASE "cic-eth";
CREATE DATABASE "cic-notify";
CREATE DATABASE "cic-meta";
CREATE DATABASE "cic-signer";

View File

@@ -0,0 +1,22 @@
CREATE TABLE tx (
id SERIAL PRIMARY KEY,
date_registered TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
block_number INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
sender VARCHAR(42) NOT NULL,
recipient VARCHAR(42) NOT NULL,
source_token VARCHAR(42) NOT NULL,
destination_token VARCHAR(42) NOT NULL,
from_value BIGINT NOT NULL,
to_value BIGINT NOT NULL,
success BOOLEAN NOT NULL,
date_block TIMESTAMP NOT NULL
);
CREATE TABLE tx_sync (
id SERIAL PRIMARY KEY,
tx VARCHAR(66) NOT NULL
);
INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');

View File

@@ -0,0 +1,23 @@
CREATE TABLE tx (
id SERIAL PRIMARY KEY,
date_registered DATETIME NOT NULL default CURRENT_DATE,
block_number INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
sender VARCHAR(42) NOT NULL,
recipient VARCHAR(42) NOT NULL,
source_token VARCHAR(42) NOT NULL,
destination_token VARCHAR(42) NOT NULL,
from_value INTEGER NOT NULL,
to_value INTEGER NOT NULL,
success BOOLEAN NOT NULL,
date_block DATETIME NOT NULL,
CHECK (success IN (0, 1))
);
CREATE TABLE tx_sync (
id SERIAL PRIMARY_KEY,
tx VARCHAR(66) NOT NULL
);
INSERT INTO tx_sync (tx) VALUES('0x0000000000000000000000000000000000000000000000000000000000000000');

View File

@@ -0,0 +1,102 @@
openapi: "3.0.3"
info:
title: Grassroots Economics CIC Cache
description: Cache of processed transaction data from Ethereum blockchain and worker queues
termsOfService: bzz://grassrootseconomics.eth/terms
contact:
name: Grassroots Economics
url: https://www.grassrootseconomics.org
email: will@grassecon.org
license:
name: GPLv3
version: 0.1.0
paths:
/tx/{offset}/{limit}:
description: Bloom filter for batch of latest transactions
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters:
- name: offset
in: path
schema:
type: integer
format: int32
- name: limit
in: path
schema:
type: integer
format: int32
/tx/{address}/{offset}/{limit}:
description: Bloom filter for batch of latest transactions by account
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters:
- name: address
in: path
required: true
schema:
type: string
- name: offset
in: path
schema:
type: integer
format: int32
- name: limit
in: path
schema:
type: integer
format: int32
components:
schemas:
BlocksBloom:
type: object
properties:
low:
type: int
format: int32
description: The lowest block number included in the filter
block_filter:
type: string
format: byte
description: Block number filter
blocktx_filter:
type: string
format: byte
description: Block and tx index filter
alg:
type: string
description: Hashing algorithm (currently only using sha256)
filter_rounds:
type: int
format: int32
description: Number of hash rounds used to create the filter

View File

@@ -0,0 +1,54 @@
FROM python:3.8.6-slim-buster
#COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
WORKDIR /usr/src/cic-cache
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
ARG root_requirement_file='requirements.txt'
#RUN apk update && \
# apk add gcc musl-dev gnupg libpq
#RUN apk add postgresql-dev
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
COPY $root_requirement_file .
RUN pip install -r $root_requirement_file $pip_extra_index_url_flag
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
cic-cache/setup.py \
./
COPY cic-cache/cic_cache/ ./cic_cache/
COPY cic-cache/scripts/ ./scripts/
COPY cic-cache/test_requirements.txt ./
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
RUN pip install $pip_extra_index_url_flag .
RUN pip install .[server]
COPY cic-cache/tests/ ./tests/
#COPY db/ cic-cache/db
#RUN apk add postgresql-client
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY cic-cache/config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,40 @@
let xmlhttprequest = require('xhr2');
let moolb = require('moolb');
let xhr = new xmlhttprequest();
xhr.responseType = 'json';
xhr.open('GET', 'http://localhost:5555/tx/0/100');
xhr.addEventListener('load', (e) => {
d = xhr.response;
b_one = Buffer.from(d.block_filter, 'base64');
b_two = Buffer.from(d.blocktx_filter, 'base64');
for (let i = 0; i < 8192; i++) {
if (b_two[i] > 0) {
console.debug('value on', i, b_two[i]);
}
}
console.log(b_one, b_two);
let f_block = moolb.fromBytes(b_one, d.filter_rounds);
let f_blocktx = moolb.fromBytes(b_two, d.filter_rounds);
let a = new ArrayBuffer(8);
let w = new DataView(a);
for (let i = 410000; i < 430000; i++) {
w.setInt32(0, i);
let r = new Uint8Array(a.slice(0, 4));
if (f_block.check(r)) {
for (let j = 0; j < 200; j++) {
w = new DataView(a);
w.setInt32(4, j);
r = new Uint8Array(a);
if (f_blocktx.check(r)) {
console.log('true', i, j);
}
}
}
}
});
let r = xhr.send();

View File

@@ -0,0 +1,10 @@
alembic==1.4.2
confini~=0.3.6b2
uwsgi==2.0.19.1
moolb~=0.1.0
cic-registry~=0.5.3a4
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3

View File

@@ -0,0 +1,56 @@
#!/usr/bin/python
import os
import argparse
import logging
import alembic
from alembic.config import Config as AlembicConfig
import confini
from cic_cache.db import dsn_from_config
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
# BUG: the dbdir doesn't work after script install
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations')
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config:\n{}'.format(config))
migrations_dir = os.path.join(args.migrations_dir, config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrations_dir):
logg.debug('migrations dir for engine {} not found, reverting to default'.format(config.get('DATABASE_ENGINE')))
migrations_dir = os.path.join(args.migrations_dir, 'default')
# connect to database
dsn = dsn_from_config(config)
logg.info('using migrations dir {}'.format(migrations_dir))
logg.info('using db {}'.format(dsn))
ac = AlembicConfig(os.path.join(migrations_dir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', dsn)
ac.set_main_option('script_location', migrations_dir)
alembic.command.upgrade(ac, 'head')

39
apps/cic-cache/setup.cfg Normal file
View File

@@ -0,0 +1,39 @@
[metadata]
name = cic-cache
description = CIC Cache API and server
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/grassrootseconomics/cic-eth
keywords =
cic
cryptocurrency
ethereum
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Environment :: No Input/Output (Daemon)
Intended Audience :: Developers
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
# Topic :: Blockchain :: EVM
license = GPL3
licence_files =
LICENSE.txt
[options]
python_requires = >= 3.6
packages =
cic_cache
cic_cache.tasks
cic_cache.db
cic_cache.db.models
cic_cache.runnable
scripts =
./scripts/migrate.py
[options.entry_points]
console_scripts =
cic-cache-trackerd = cic_cache.runnable.tracker:main
cic-cache-serverd = cic_cache.runnable.server:main
cic-cache-taskerd = cic_cache.runnable.tasker:main

60
apps/cic-cache/setup.py Normal file
View File

@@ -0,0 +1,60 @@
from setuptools import setup
import configparser
import os
import time
from cic_cache.version import (
version_object,
version_string
)
class PleaseCommitFirstError(Exception):
pass
def git_hash():
import subprocess
git_diff = subprocess.run(['git', 'diff'], capture_output=True)
if len(git_diff.stdout) > 0:
raise PleaseCommitFirstError()
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
return git_hash_brief
version_string = str(version_object)
try:
version_git = git_hash()
version_string += '+build.{}'.format(version_git)
except FileNotFoundError:
time_string_pair = str(time.time()).split('.')
version_string += '+build.{}{:<09d}'.format(
time_string_pair[0],
int(time_string_pair[1]),
)
print('final version string will be {}'.format(version_string))
requirements = []
f = open('requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
requirements.append(l.rstrip())
f.close()
test_requirements = []
f = open('test_requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
test_requirements.append(l.rstrip())
f.close()
setup(
version=version_string,
install_requires=requirements,
tests_require=test_requirements,
)

View File

@@ -0,0 +1,6 @@
pytest==6.0.1
pytest-cov==2.10.1
pytest-mock==3.3.1
pysqlite3==0.4.3
sqlparse==0.4.1
pytest-celery==0.0.0a1

View File

@@ -0,0 +1,86 @@
# standard imports
import os
import sys
import datetime
# third-party imports
import pytest
# local imports
from cic_cache import db
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
sys.path.insert(0, root_dir)
# fixtures
from tests.fixtures_config import *
from tests.fixtures_database import *
from tests.fixtures_celery import *
@pytest.fixture(scope='session')
def balances_dict_fields():
return {
'out_pending': 0,
'out_synced': 1,
'out_confirmed': 2,
'in_pending': 3,
'in_synced': 4,
'in_confirmed': 5,
}
@pytest.fixture(scope='function')
def txs(
init_database,
list_defaults,
list_actors,
list_tokens,
):
session = init_database
tx_number = 13
tx_hash_first = '0x' + os.urandom(32).hex()
val = 15000
nonce = 1
dt = datetime.datetime.utcnow()
db.add_transaction(
session,
tx_hash_first,
list_defaults['block'],
tx_number,
list_actors['alice'],
list_actors['bob'],
list_tokens['foo'],
list_tokens['foo'],
1024,
2048,
True,
dt.timestamp(),
)
tx_number = 42
tx_hash_second = '0x' + os.urandom(32).hex()
tx_signed_second = '0x' + os.urandom(128).hex()
nonce = 1
dt -= datetime.timedelta(hours=1)
db.add_transaction(
session,
tx_hash_second,
list_defaults['block']-1,
tx_number,
list_actors['diane'],
list_actors['alice'],
list_tokens['foo'],
list_tokens['foo'],
1024,
2048,
False,
dt.timestamp(),
)
session.commit()

View File

@@ -0,0 +1,48 @@
# third-party imports
import pytest
import tempfile
import logging
import shutil
logg = logging.getLogger(__name__)
# celery fixtures
@pytest.fixture(scope='session')
def celery_includes():
return [
'cic_cache.tasks.tx',
]
@pytest.fixture(scope='session')
def celery_config():
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
rq = tempfile.mkdtemp()
logg.debug('celery broker queue {} processed {}'.format(bq, bp))
logg.debug('celery backend store {}'.format(rq))
yield {
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
'result_backend': 'file://{}'.format(rq),
}
logg.debug('cleaning up celery filesystem backend files {} {} {}'.format(bq, bp, rq))
shutil.rmtree(bq)
shutil.rmtree(bp)
shutil.rmtree(rq)
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
# 'queues': ('cic-cache'),
}
@pytest.fixture(scope='session')
def celery_enable_logging():
return True

View File

@@ -0,0 +1,20 @@
# standard imports
import os
import logging
# third-party imports
import pytest
import confini
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, '.config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))
return conf

View File

@@ -0,0 +1,118 @@
# standard imports
import os
import logging
import re
# third-party imports
import pytest
import sqlparse
# local imports
from cic_cache.db.models.base import SessionBase
from cic_cache.db import dsn_from_config
logg = logging.getLogger(__file__)
@pytest.fixture(scope='function')
def database_engine(
load_config,
):
if load_config.get('DATABASE_ENGINE') == 'sqlite':
SessionBase.transactional = False
SessionBase.poolable = False
try:
os.unlink(load_config.get('DATABASE_NAME'))
except FileNotFoundError:
pass
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn)
return dsn
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
@pytest.fixture(scope='function')
def init_database(
load_config,
database_engine,
):
rootdir = os.path.dirname(os.path.dirname(__file__))
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
if load_config.get('DATABASE_ENGINE') == 'sqlite':
rconn = SessionBase.engine.raw_connection()
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
rconn.executescript(s)
else:
rconn = SessionBase.engine.raw_connection()
rcursor = rconn.cursor()
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
r = re.compile(r'^[A-Z]', re.MULTILINE)
for l in sqlparse.parse(s):
strl = str(l)
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
if not re.search(r, strl):
logg.warning('skipping parsed query line {}'.format(strl))
continue
rcursor.execute(strl)
rconn.commit()
rcursor.execute('SET search_path TO public')
# this doesn't work when run separately, no idea why
# functions have been manually added to original schema from cic-eth
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
#
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
rcursor.close()
session = SessionBase.create_session()
yield session
session.commit()
session.close()
@pytest.fixture(scope='function')
def list_tokens(
):
return {
'foo': '0x' + os.urandom(20).hex(),
'bar': '0x' + os.urandom(20).hex(),
}
@pytest.fixture(scope='function')
def list_actors(
):
return {
'alice': '0x' + os.urandom(20).hex(),
'bob': '0x' + os.urandom(20).hex(),
'charlie': '0x' + os.urandom(20).hex(),
'diane': '0x' + os.urandom(20).hex(),
}
@pytest.fixture(scope='function')
def list_defaults(
):
return {
'block': 420000,
}

View File

@@ -0,0 +1,35 @@
# standard imports
import os
import datetime
import logging
import json
# third-party imports
import pytest
# local imports
from cic_cache import BloomCache
logg = logging.getLogger()
def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
session = init_database
c = BloomCache(session)
b = c.load_transactions(0, 100)
assert b[0] == list_defaults['block'] - 1
c = BloomCache(session)
c.load_transactions_account(list_actors['alice'],0, 100)
assert b[0] == list_defaults['block'] - 1

View File

@@ -0,0 +1,27 @@
# standard imports
import logging
# third-party imports
import celery
# local imports
from cic_cache.api import Api
logg = logging.getLogger()
def test_task(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
celery_session_worker,
):
api = Api(queue=None)
t = api.list(0, 100)
r = t.get()
logg.debug('r {}'.format(r))
assert r['low'] == list_defaults['block'] - 1

View File

@@ -8,13 +8,17 @@ from cic_registry import zero_address
# local imports
from cic_eth.db.enum import LockEnum
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.lock import Lock
from cic_eth.task import (
CriticalSQLAlchemyTask,
)
from cic_eth.error import LockedError
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
@@ -32,7 +36,7 @@ def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks
@@ -50,7 +54,7 @@ def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
"""Task wrapper to set send lock
@@ -66,7 +70,7 @@ def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_str, address=zero_address):
"""Task wrapper to reset send lock
@@ -82,7 +86,7 @@ def unlock_send(chained_input, chain_str, address=zero_address):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
"""Task wrapper to set queue direct lock
@@ -98,7 +102,7 @@ def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_str, address=zero_address):
"""Task wrapper to reset queue direct lock
@@ -114,11 +118,12 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_str, lock_flags, address=None):
r = Lock.check(chain_str, lock_flags, address=zero_address)
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)
if address != None:
r |= Lock.check(chain_str, lock_flags, address=address)
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
if r > 0:
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
raise LockedError(r)

View File

@@ -1,12 +1,25 @@
# standard imports
import datetime
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
from cic_eth.db.models.base import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask
celery_app = celery.current_app
@celery_app.task()
def out_tmp(tag, txt):
f = open('/tmp/err.{}.txt'.format(tag), "w")
f.write(txt)
f.close()
@celery_app.task(base=CriticalSQLAlchemyTask)
def alert(chained_input, tag, txt):
session = SessionBase.create_session()
o = Debug(tag, txt)
session.add(o)
session.commit()
session.close()
return chained_input

View File

@@ -1,5 +1,6 @@
# standard imports
import logging
import sys
# third-party imports
import celery
@@ -16,7 +17,10 @@ from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
is_alive,
)
from cic_eth.error import InitializationError
from cic_eth.db.error import TxStateChangeError
from cic_eth.eth.rpc import RpcClient
@@ -98,6 +102,19 @@ class AdminApi:
session.close()
def have_account(self, address_hex, chain_str):
s_have = celery.signature(
'cic_eth.eth.account.have',
[
address_hex,
chain_str,
],
queue=self.queue,
)
t = s_have.apply_async()
return t.get()
def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False):
logg.debug('resend {}'.format(tx_hash_hex))
s_get_tx_cache = celery.signature(
@@ -110,24 +127,32 @@ class AdminApi:
# TODO: This check should most likely be in resend task itself
tx_dict = s_get_tx_cache.apply_async().get()
if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
#if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
if not is_alive(getattr(StatusEnum, tx_dict['status']).value):
raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex))
s = None
if in_place:
s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
tx_hash_hex,
chain_str,
None,
1.01,
],
queue=self.queue,
)
else:
if not in_place:
raise NotImplementedError('resend as new not yet implemented')
s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
chain_str,
None,
1.01,
],
queue=self.queue,
)
s_manual = celery.signature(
'cic_eth.queue.tx.set_manual',
[
tx_hash_hex,
],
queue=self.queue,
)
s_manual.link(s)
if unlock:
s_gas = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
@@ -139,7 +164,7 @@ class AdminApi:
)
s.link(s_gas)
return s.apply_async()
return s_manual.apply_async()
def check_nonce(self, address):
s = celery.signature(
@@ -243,7 +268,9 @@ class AdminApi:
"""
s = celery.signature(
'cic_eth.queue.tx.get_account_tx',
[address],
[
address,
],
queue=self.queue,
)
txs = s.apply_async().get()
@@ -291,6 +318,8 @@ class AdminApi:
:return: Transaction details
:rtype: dict
"""
problems = []
if tx_hash != None and tx_raw != None:
ValueError('Specify only one of hash or raw tx')
@@ -418,10 +447,12 @@ class AdminApi:
r = c.w3.eth.getTransactionReceipt(tx_hash)
if r.status == 1:
tx['network_status'] = 'Confirmed'
tx['block'] = r.blockNumber
tx['tx_index'] = r.transactionIndex
else:
tx['network_status'] = 'Reverted'
tx['network_block_number'] = r.blockNumber
tx['network_tx_index'] = r.transactionIndex
if tx['block_number'] == None:
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
except web3.exceptions.TransactionNotFound:
pass
@@ -431,6 +462,7 @@ class AdminApi:
tx_unpacked = unpack_signed_raw_tx(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
tx['gas_price'] = tx_unpacked['gasPrice']
tx['gas_limit'] = tx_unpacked['gas']
tx['data'] = tx_unpacked['data']
s = celery.signature(
'cic_eth.queue.tx.get_state_log',
@@ -442,4 +474,9 @@ class AdminApi:
t = s.apply_async()
tx['status_log'] = t.get()
if len(problems) > 0:
sys.stderr.write('\n')
for p in problems:
sys.stderr.write('!!!{}\n'.format(p))
return tx

View File

@@ -6,10 +6,13 @@
# standard imports
import logging
# third-party imports
# external imports
import celery
from cic_registry.chain import ChainSpec
#from cic_registry.chain import ChainSpec
from cic_registry import CICRegistry
from chainlib.chain import ChainSpec
# local imports
from cic_eth.eth.factory import TxFactory
from cic_eth.db.enum import LockEnum
@@ -30,7 +33,7 @@ class Api:
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop', callback_queue=None):
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
@@ -79,6 +82,7 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
@@ -89,6 +93,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -107,7 +116,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -134,6 +144,7 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
@@ -144,6 +155,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -162,7 +178,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -197,6 +214,13 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
from_address,
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -214,7 +238,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_transfer.link(self.callback_success)
s_tokens.link(s_transfer).on_error(self.callback_error)
@@ -225,89 +250,15 @@ class Api:
return t
def transfer_request(self, from_address, to_address, spender_address, value, token_symbol):
"""Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:type to_address: str, 0x-hex
:param spender_address: Ethereum address that is executing transfer (typically an escrow contract)
:type spender_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_tokens_transfer_approval = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_tokens_approve = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
from_address,
spender_address,
value,
self.chain_str,
],
queue=self.queue,
)
s_transfer_approval = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
from_address,
to_address,
value,
self.chain_str,
],
queue=self.queue,
)
# TODO: make approve and transfer_approval chainable so callback can be part of the full chain
if self.callback_param != None:
s_transfer_approval.link(self.callback_success)
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error)
else:
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval)
g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue)
s_check.link(g)
t = s_check.apply_async()
#t = s_tokens.apply_async(queue=self.queue)
return t
def balance(self, address, token_symbol):
def balance(self, address, token_symbol, include_pending=True):
"""Calls the provided callback with the current token balance of the given address.
:param address: Ethereum address of holder
:type address: str, 0x-hex
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:param include_pending: If set, will include transactions that have not yet been fully processed
:type include_pending: bool
:returns: uuid of root task
:rtype: celery.Task
"""
@@ -330,14 +281,45 @@ class Api:
],
queue=self.queue,
)
if self.callback_param != None:
s_balance.link(self.callback_success)
s_tokens.link(s_balance).on_error(self.callback_error)
else:
s_tokens.link(s_balance)
s_result = celery.signature(
'cic_eth.queue.balance.assemble_balances',
[],
queue=self.queue,
)
t = s_tokens.apply_async(queue=self.queue)
last_in_chain = s_balance
if include_pending:
s_balance_incoming = celery.signature(
'cic_eth.queue.balance.balance_incoming',
[
address,
self.chain_str,
],
queue=self.queue,
)
s_balance_outgoing = celery.signature(
'cic_eth.queue.balance.balance_outgoing',
[
address,
self.chain_str,
],
queue=self.queue,
)
s_balance.link(s_balance_incoming)
s_balance_incoming.link(s_balance_outgoing)
last_in_chain = s_balance_outgoing
one = celery.chain(s_tokens, s_balance)
two = celery.chain(s_tokens, s_balance_incoming)
three = celery.chain(s_tokens, s_balance_outgoing)
t = None
if self.callback_param != None:
s_result.link(self.callback_success).on_error(self.callback_error)
t = celery.chord([one, two, three])(s_result)
else:
t = celery.chord([one, two, three])(s_result)
return t
@@ -372,6 +354,13 @@ class Api:
s_account.link(self.callback_success)
if register:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'ACCOUNTS_INDEX_WRITER',
],
queue=self.queue,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
@@ -379,7 +368,8 @@ class Api:
],
queue=self.queue,
)
s_account.link(s_register)
s_nonce.link(s_register)
s_account.link(s_nonce)
t = s_check.apply_async(queue=self.queue)
return t
@@ -402,6 +392,13 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'GAS_GIFTER',
],
queue=self.queue,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
@@ -409,7 +406,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_refill)
s_nonce.link(s_refill)
s_check.link(s_nonce)
if self.callback_param != None:
s_refill.link(self.callback_success)
@@ -417,6 +415,73 @@ class Api:
return t
def list(self, address, limit=10, external_task=None, external_queue=None):
"""Retrieve an aggregate list of latest transactions of internal and (optionally) external origin in reverse chronological order.
The array of transactions returned have the same dict layout as those passed by the callback filter in cic_eth/runnable/manager
If the external task is defined, this task will be used to query external transactions. If this is not defined, no external transactions will be included. The task must accept (offset, limit, address) as input parameters, and return a bloom filter that will be used to retrieve transaction data for the matching transactions. See cic_eth.ext.tx.list_tx_by_bloom for details on the bloom filter dat format.
:param address: Ethereum address to list transactions for
:type address: str, 0x-hex
:param limit: Amount of results to return
:type limit: number
:param external_task: Celery task providing external transactions
:type external_task: str
:param external_queue: Celery task queue providing exernal transactions task
:type external_queue: str
:returns: List of transactions
:rtype: list of dict
"""
offset = 0
s_local = celery.signature(
'cic_eth.queue.tx.get_account_tx',
[
address,
],
queue=self.queue,
)
s_brief = celery.signature(
'cic_eth.ext.tx.tx_collate',
[
self.chain_str,
offset,
limit
],
queue=self.queue,
)
s_local.link(s_brief)
if self.callback_param != None:
s_brief.link(self.callback_success).on_error(self.callback_error)
t = None
if external_task != None:
s_external_get = celery.signature(
external_task,
[
address,
offset,
limit,
],
queue=external_queue,
)
s_external_process = celery.signature(
'cic_eth.ext.tx.list_tx_by_bloom',
[
address,
self.chain_str,
],
queue=self.queue,
)
c = celery.chain(s_external_get, s_external_process)
t = celery.chord([s_local, c])(s_brief)
else:
t = s_local.apply_async(queue=self.queue)
return t
def ping(self, r):
"""A noop callback ping for testing purposes.

View File

@@ -18,4 +18,4 @@ def noop(self, result, param, status_code):
:rtype: bool
"""
logg.info('noop callback {} {} {}'.format(result, param, status_code))
return True
return result

View File

@@ -18,7 +18,11 @@ logg = celery_app.log.get_default_logger()
def redis(self, result, destination, status_code):
(host, port, db, channel) = destination.split(':')
r = redis_interface.Redis(host=host, port=port, db=db)
s = json.dumps(result)
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
logg.debug('redis callback on host {} port {} db {} channel {}'.format(host, port, db, channel))
r.publish(channel, s)
r.publish(channel, json.dumps(data))
r.close()

View File

@@ -20,5 +20,10 @@ def tcp(self, result, destination, status_code):
(host, port) = destination.split(':')
logg.debug('tcp callback to {} {}'.format(host, port))
s.connect((host, int(port)))
s.send(json.dumps(result).encode('utf-8'))
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
s.send(json.dumps(data).encode('utf-8'))
s.close()

View File

@@ -1,6 +1,29 @@
# standard imports
import enum
@enum.unique
class StatusBits(enum.IntEnum):
"""Individual bit flags that are combined to define the state and legacy of a queued transaction
"""
QUEUED = 0x01 # transaction should be sent to network
IN_NETWORK = 0x08 # transaction is in network
DEFERRED = 0x10 # an attempt to send the transaction to network has failed
GAS_ISSUES = 0x20 # transaction is pending sender account gas funding
LOCAL_ERROR = 0x100 # errors that originate internally from the component
NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...)
NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT)
UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur)
FINAL = 0x1000 # transaction processing has completed
OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee
MANUAL = 0x8000 # transaction processing has been manually overridden
@enum.unique
class StatusEnum(enum.IntEnum):
"""
@@ -22,21 +45,27 @@ class StatusEnum(enum.IntEnum):
* SUCCESS: THe transaction was successfully mined. (Block number will be set)
"""
PENDING=-9
SENDFAIL=-8
RETRY=-7
READYSEND=-6
OBSOLETED=-2
WAITFORGAS=-1
SENT=0
FUBAR=1
CANCELLED=2
OVERRIDDEN=3
REJECTED=7
REVERTED=8
SUCCESS=9
PENDING = 0
SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR
RETRY = StatusBits.QUEUED | StatusBits.DEFERRED
READYSEND = StatusBits.QUEUED
OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK
WAITFORGAS = StatusBits.GAS_ISSUES
SENT = StatusBits.IN_NETWORK
FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR
CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE
OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL
REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL
REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR
SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL
@enum.unique
class LockEnum(enum.IntEnum):
"""
STICKY: When set, reset is not possible
@@ -48,4 +77,81 @@ class LockEnum(enum.IntEnum):
CREATE=2
SEND=4
QUEUE=8
QUERY=16
ALL=int(0xfffffffffffffffe)
def status_str(v, bits_only=False):
"""Render a human-readable string describing the status
If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned.
If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only.
:param v: Status bit field
:type v: number
:param bits_only: Only render individual bit labels.
:type bits_only: bool
:returns: Status string
:rtype: str
"""
s = ''
if not bits_only:
try:
s = StatusEnum(v).name
return s
except ValueError:
pass
if v == 0:
return 'NONE'
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:
n = StatusBits(b).name
if len(s) > 0:
s += ','
s += n
if not bits_only:
s += '*'
return s
def all_errors():
"""Bit mask of all error states
:returns: Error flags
:rtype: number
"""
return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
def is_error_status(v):
"""Check if value is an error state
:param v: Status bit field
:type v: number
:returns: True if error
:rtype: bool
"""
return bool(v & all_errors())
def dead():
"""Bit mask defining whether a transaction is still likely to be processed on the network.
:returns: Bit mask
:rtype: number
"""
return StatusBits.FINAL | StatusBits.OBSOLETE
def is_alive(v):
"""Check if transaction is still likely to be processed on the network.
The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set).
:returns:
"""
return bool(v & dead() == 0)

View File

@@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -27,8 +27,8 @@ def upgrade():
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.String(), nullable=False),
sa.Column('to_value', sa.String(), nullable=True),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)

View File

@@ -0,0 +1,28 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -19,7 +19,6 @@ def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
@@ -27,8 +26,8 @@ def upgrade():
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.BIGINT(), nullable=False),
sa.Column('to_value', sa.BIGINT(), nullable=True),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)

View File

@@ -0,0 +1,28 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -1,8 +1,18 @@
# stanard imports
import logging
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
)
logg = logging.getLogger()
Model = declarative_base(name='Model')
@@ -21,7 +31,11 @@ class SessionBase(Model):
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
@@ -40,7 +54,7 @@ class SessionBase(Model):
@staticmethod
def connect(dsn, debug=False):
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
@@ -48,14 +62,28 @@ class SessionBase(Model):
"""
e = None
if SessionBase.poolable:
e = create_engine(
dsn,
max_overflow=50,
pool_pre_ping=True,
pool_size=20,
pool_recycle=10,
echo=debug,
)
poolclass = QueuePool
if pool_size > 1:
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
e = create_engine(
dsn,
@@ -71,3 +99,23 @@ class SessionBase(Model):
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -0,0 +1,23 @@
# standard imports
import datetime
import logging
# external imports
from sqlalchemy import Column, String, DateTime
# local imports
from .base import SessionBase
class Debug(SessionBase):
__tablename__ = 'debug'
date_created = Column(DateTime, default=datetime.datetime.utcnow)
tag = Column(String)
description = Column(String)
def __init__(self, tag, description):
self.tag = tag
self.description = description

View File

@@ -55,11 +55,9 @@ class Lock(SessionBase):
:returns: New flag state of entry
:rtype: number
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(Lock)
q = session.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
@@ -71,7 +69,8 @@ class Lock(SessionBase):
lock.address = address
lock.blockchain = chain_str
if tx_hash != None:
q = localsession.query(Otx)
session.flush()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
if otx != None:
@@ -80,12 +79,11 @@ class Lock(SessionBase):
lock.flags |= flags
r = lock.flags
localsession.add(lock)
localsession.commit()
session.add(lock)
session.commit()
SessionBase.release_session(session)
if session == None:
localsession.close()
return r
@@ -110,11 +108,9 @@ class Lock(SessionBase):
:returns: New flag state of entry
:rtype: number
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(Lock)
q = session.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
@@ -124,14 +120,13 @@ class Lock(SessionBase):
if lock != None:
lock.flags &= ~flags
if lock.flags == 0:
localsession.delete(lock)
session.delete(lock)
else:
localsession.add(lock)
session.add(lock)
r = lock.flags
localsession.commit()
session.commit()
if session == None:
localsession.close()
SessionBase.release_session(session)
return r
@@ -156,22 +151,20 @@ class Lock(SessionBase):
:rtype: number
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(Lock)
q = session.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
q = q.filter(Lock.flags.op('&')(flags)==flags)
lock = q.first()
if session == None:
localsession.close()
r = 0
if lock != None:
r = lock.flags & flags
SessionBase.release_session(session)
return r

View File

@@ -1,11 +1,16 @@
# standard imports
import logging
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer
from sqlalchemy import Column, String, Integer, DateTime
# local imports
from .base import SessionBase
from cic_eth.error import (
InitializationError,
IntegrityError,
)
logg = logging.getLogger()
@@ -21,12 +26,9 @@ class Nonce(SessionBase):
@staticmethod
def get(address, session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(Nonce)
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==address)
nonce = q.first()
@@ -34,8 +36,7 @@ class Nonce(SessionBase):
if nonce != None:
nonce_value = nonce.nonce;
if session == None:
localsession.close()
SessionBase.release_session(session)
return nonce_value
@@ -54,6 +55,28 @@ class Nonce(SessionBase):
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
@staticmethod
def __init(conn, address, nonce):
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
@staticmethod
def init(address, nonce=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==address)
o = q.first()
if o != None:
session.flush()
raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce))
session.flush()
Nonce.__init(session, address, nonce)
SessionBase.release_session(session)
# TODO: Incrementing nonce MUST be done by separate tasks.
@staticmethod
def next(address, initial_if_not_exists=0):
"""Generate next nonce for the given address.
@@ -67,20 +90,96 @@ class Nonce(SessionBase):
:returns: Nonce
:rtype: number
"""
#session = SessionBase.bind_session(session)
#session.begin_nested()
conn = Nonce.engine.connect()
if Nonce.transactional:
conn.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
logg.debug('locking nonce table for address {}'.format(address))
nonce = Nonce.__get(conn, address)
logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None:
nonce = initial_if_not_exists
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__init(conn, address, nonce)
Nonce.__set(conn, address, nonce+1)
if Nonce.transactional:
conn.execute('COMMIT')
logg.debug('unlocking nonce table for address {}'.format(address))
conn.close()
#session.commit()
#SessionBase.release_session(session)
return nonce
class NonceReservation(SessionBase):
__tablename__ = 'nonce_task_reservation'
nonce = Column(Integer)
key = Column(String)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
@staticmethod
def peek(key, session=None):
session = SessionBase.bind_session(session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
nonce = None
if o != None:
nonce = o.nonce
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def release(key, session=None):
session = SessionBase.bind_session(session)
nonce = NonceReservation.peek(key, session=session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
if o == None:
raise IntegrityError('nonce for key {}'.format(nonce))
SessionBase.release_session(session)
session.delete(o)
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def next(address, key, session=None):
session = SessionBase.bind_session(session)
if NonceReservation.peek(key, session) != None:
raise IntegrityError('nonce for key {}'.format(key))
nonce = Nonce.next(address)
o = NonceReservation()
o.nonce = nonce
o.key = key
session.add(o)
SessionBase.release_session(session)
return nonce

View File

@@ -2,13 +2,18 @@
import datetime
import logging
# third-party imports
# external imports
from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
is_error_status,
)
from cic_eth.db.error import TxStateChangeError
#from cic_eth.eth.util import address_hex_from_signed_tx
@@ -54,21 +59,31 @@ class Otx(SessionBase):
block = Column(Integer)
def __set_status(self, status, session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
def __set_status(self, status, session):
self.status |= status
session.add(self)
session.flush()
self.status = status
localsession.add(self)
localsession.flush()
if self.tracing:
self.__state_log(session=localsession)
def __reset_status(self, status, session):
status_edit = ~status & self.status
self.status &= status_edit
session.add(self)
session.flush()
if session==None:
localsession.commit()
localsession.close()
def __status_already_set(self, status):
r = bool(self.status & status)
if r:
logg.warning('status bit {} already set on {}'.format(status.name, self.tx_hash))
return r
def __status_not_set(self, status):
r = not(self.status & status)
if r:
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None):
@@ -102,9 +117,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status >= StatusEnum.SENT.value:
raise TxStateChangeError('WAITFORGAS cannot succeed final state, had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.WAITFORGAS, session)
if self.__status_already_set(StatusBits.GAS_ISSUES):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.GAS_ISSUES, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.DEFERRED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def fubar(self, session=None):
@@ -112,28 +141,89 @@ class Otx(SessionBase):
Only manipulates object, does not transaction or commit to backend.
"""
self.__set_status(StatusEnum.FUBAR, session)
if self.__status_already_set(StatusBits.UNKNOWN_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def reject(self, session=None):
"""Marks transaction as "rejected," which means the node rejected sending the transaction to the network. The nonce has not been spent, and the transaction should be replaced.
Only manipulates object, does not transaction or commit to backend.
"""
if self.status >= StatusEnum.SENT.value:
raise TxStateChangeError('REJECTED cannot succeed SENT or final state, had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.REJECTED, session)
if self.__status_already_set(StatusBits.NODE_ERROR):
return
def override(self, session=None):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def override(self, manual=False, session=None):
"""Marks transaction as manually overridden.
Only manipulates object, does not transaction or commit to backend.
"""
if self.status >= StatusEnum.SENT.value:
raise TxStateChangeError('OVERRIDDEN cannot succeed SENT or final state, had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.OVERRIDDEN, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.OBSOLETE, session)
#if manual:
# self.__set_status(StatusBits.MANUAL, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.IN_NETWORK, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def manual(self, session=None):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.MANUAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def retry(self, session=None):
"""Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding.
@@ -142,9 +232,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status != StatusEnum.SENT.value and self.status != StatusEnum.SENDFAIL.value:
raise TxStateChangeError('RETRY must follow SENT or SENDFAIL, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.RETRY, session)
if self.__status_already_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0:
raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def readysend(self, session=None):
@@ -154,9 +258,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status != StatusEnum.PENDING.value and self.status != StatusEnum.WAITFORGAS.value:
raise TxStateChangeError('READYSEND must follow PENDING or WAITFORGAS, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.READYSEND, session)
if self.__status_already_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sent(self, session=None):
@@ -166,9 +284,21 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status > StatusEnum.SENT:
raise TxStateChangeError('SENT after {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.SENT, session)
if self.__status_already_set(StatusBits.IN_NETWORK):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.IN_NETWORK, session)
self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sendfail(self, session=None):
@@ -178,9 +308,49 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status not in [StatusEnum.PENDING, StatusEnum.SENT, StatusEnum.WAITFORGAS]:
raise TxStateChangeError('SENDFAIL must follow SENT or PENDING, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.SENDFAIL, session)
if self.__status_already_set(StatusBits.NODE_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def dequeue(self, session=None):
"""Marks that a process to execute send attempt is underway
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_not_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__reset_status(StatusBits.QUEUED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None):
@@ -192,14 +362,25 @@ class Otx(SessionBase):
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NETWORK_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if block != None:
self.block = block
if self.status != StatusEnum.SENT:
logg.warning('REVERTED should follow SENT, but had {}'.format(StatusEnum(self.status).name))
#if self.status != StatusEnum.PENDING and self.status != StatusEnum.OBSOLETED and self.status != StatusEnum.SENT:
#if self.status > StatusEnum.SENT:
# raise TxStateChangeError('REVERTED must follow OBSOLETED, PENDING or SENT, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.REVERTED, session)
self.__set_status(StatusBits.NETWORK_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def cancel(self, confirmed=False, session=None):
@@ -213,17 +394,23 @@ class Otx(SessionBase):
:type confirmed: bool
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed:
if self.status != StatusEnum.OBSOLETED:
logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
#raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusEnum.CANCELLED, session)
elif self.status != StatusEnum.OBSOLETED:
if self.status > StatusEnum.SENT:
logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
#raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
else:
self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def success(self, block, session=None):
"""Marks that transaction was successfully mined.
@@ -235,16 +422,27 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status)))
if block != None:
self.block = block
if self.status != StatusEnum.SENT:
logg.error('SUCCESS should follow SENT, but had {}'.format(StatusEnum(self.status).name))
#raise TxStateChangeError('SUCCESS must follow SENT, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.SUCCESS, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
@staticmethod
def get(status=0, limit=4096, status_exact=True):
def get(status=0, limit=4096, status_exact=True, session=None):
"""Returns outgoing transaction lists by status.
Status may either be matched exactly, or be an upper bound of the integer value of the status enum.
@@ -259,26 +457,32 @@ class Otx(SessionBase):
:rtype: tuple, where first element is transaction hash
"""
e = None
session = Otx.create_session()
session = SessionBase.bind_session(session)
if status_exact:
e = session.query(Otx.tx_hash).filter(Otx.status==status).order_by(Otx.date_created.asc()).limit(limit).all()
else:
e = session.query(Otx.tx_hash).filter(Otx.status<=status).order_by(Otx.date_created.asc()).limit(limit).all()
session.close()
SessionBase.release_session(session)
return e
@staticmethod
def load(tx_hash):
def load(tx_hash, session=None):
"""Retrieves the outgoing transaction record by transaction hash.
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
"""
session = Otx.create_session()
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
session.close()
SessionBase.release_session(session)
return q.first()
@@ -450,6 +654,3 @@ class OtxSync(SessionBase):
self.tx_height_session = 0
self.block_height_backlog = 0
self.tx_height_backlog = 0

View File

@@ -24,9 +24,10 @@ class AccountRole(SessionBase):
tag = Column(Text)
address_hex = Column(String(42))
# TODO:
@staticmethod
def get_address(tag):
def get_address(tag, session):
"""Get Ethereum address matching the given tag
:param tag: Tag
@@ -34,14 +35,26 @@ class AccountRole(SessionBase):
:returns: Ethereum address, or zero-address if tag does not exist
:rtype: str, 0x-hex
"""
role = AccountRole.get_role(tag)
if role == None:
return zero_address
return role.address_hex
if session == None:
raise ValueError('nested bind session calls will not succeed as the first call to release_session in the stack will leave the db object detached further down the stack. We will need additional reference count.')
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
r = zero_address
if role != None:
r = role.address_hex
session.flush()
SessionBase.release_session(session)
return r
@staticmethod
def get_role(tag):
def get_role(tag, session=None):
"""Get AccountRole model object matching the given tag
:param tag: Tag
@@ -49,20 +62,27 @@ class AccountRole(SessionBase):
:returns: Role object, if found
:rtype: cic_eth.db.models.role.AccountRole
"""
session = AccountRole.create_session()
role = AccountRole.__get_role(session, tag)
session.close()
#return role.address_hex
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
session.flush()
SessionBase.release_session(session)
return role
@staticmethod
def __get_role(session, tag):
return session.query(AccountRole).filter(AccountRole.tag==tag).first()
def __get_role(tag, session):
q = session.query(AccountRole)
q = q.filter(AccountRole.tag==tag)
r = q.first()
return r
@staticmethod
def set(tag, address_hex):
def set(tag, address_hex, session=None):
"""Persist a tag to Ethereum address association.
This will silently overwrite the existing value.
@@ -74,16 +94,18 @@ class AccountRole(SessionBase):
:returns: Role object
:rtype: cic_eth.db.models.role.AccountRole
"""
#session = AccountRole.create_session()
#role = AccountRole.__get(session, tag)
role = AccountRole.get_role(tag) #session, tag)
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
if role == None:
role = AccountRole(tag)
role.address_hex = address_hex
#session.add(role)
#session.commit()
#session.close()
return role #address_hex
session.flush()
SessionBase.release_session(session)
return role
@staticmethod
@@ -95,20 +117,17 @@ class AccountRole(SessionBase):
:returns: Role tag, or None if no match
:rtype: str or None
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(AccountRole)
q = session.query(AccountRole)
q = q.filter(AccountRole.address_hex==address)
role = q.first()
tag = None
if role != None:
tag = role.tag
if session == None:
localsession.close()
SessionBase.release_session(session)
return tag

View File

@@ -2,7 +2,7 @@
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, Enum, ForeignKey, Boolean
from sqlalchemy import Column, String, Integer, DateTime, Enum, ForeignKey, Boolean, NUMERIC
from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property
#from sqlalchemy.orm import relationship, backref
#from sqlalchemy.ext.declarative import declarative_base
@@ -55,8 +55,8 @@ class TxCache(SessionBase):
destination_token_address = Column(String(42))
sender = Column(String(42))
recipient = Column(String(42))
from_value = Column(String())
to_value = Column(String())
from_value = Column(NUMERIC())
to_value = Column(NUMERIC())
block_number = Column(Integer())
tx_index = Column(Integer())
date_created = Column(DateTime, default=datetime.datetime.utcnow)
@@ -64,16 +64,6 @@ class TxCache(SessionBase):
date_checked = Column(DateTime, default=datetime.datetime.utcnow)
def values(self):
from_value_hex = bytes.fromhex(self.from_value)
from_value = int.from_bytes(from_value_hex, 'big')
to_value_hex = bytes.fromhex(self.to_value)
to_value = int.from_bytes(to_value_hex, 'big')
return (from_value, to_value)
def check(self):
"""Update the "checked" timestamp to current time.
@@ -95,68 +85,67 @@ class TxCache(SessionBase):
:param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = localsession.query(TxCache)
session = SessionBase.bind_session(session)
q = session.query(TxCache)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_original)
txc = q.first()
if txc == None:
SessionBase.release_session(session)
raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None:
SessionBase.release_session(session)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
q = localsession.query(Otx)
session.flush()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_new)
otx = q.first()
if otx == None:
SessionBase.release_session(session)
raise NotLocalTxError('new {}'.format(tx_hash_new))
values = txc.values()
txc_new = TxCache(
otx.tx_hash,
txc.sender,
txc.recipient,
txc.source_token_address,
txc.destination_token_address,
values[0],
values[1],
int(txc.from_value),
int(txc.to_value),
session=session,
)
localsession.add(txc_new)
localsession.commit()
session.add(txc_new)
session.commit()
if session == None:
localsession.close()
SessionBase.release_session(session)
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None):
session = SessionBase.create_session()
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
if tx == None:
session.close()
SessionBase.release_session(session)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id
# if tx == None:
# session.close()
# raise ValueError('tx hash {} (outgoing: {}) not found'.format(tx_hash, outgoing))
# session.close()
self.sender = sender
self.recipient = recipient
self.source_token_address = source_token_address
self.destination_token_address = destination_token_address
self.from_value = num_serialize(from_value).hex()
self.to_value = num_serialize(to_value).hex()
self.from_value = from_value
self.to_value = to_value
self.block_number = block_number
self.tx_index = tx_index
# not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.now()
self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created
self.date_checked = self.date_created
SessionBase.release_session(session)

View File

@@ -54,8 +54,29 @@ class RoleMissingError(Exception):
pass
class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks
"""
pass
class LockedError(Exception):
"""Exception raised when attempt is made to execute action that is deactivated by lock
"""
pass
class SignerError(Exception):
"""Exception raised when signer is unavailable or generates an error
"""
pass
class EthError(Exception):
"""Exception raised when unspecified error from evm node is encountered
"""
pass

View File

@@ -8,6 +8,7 @@ from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from erc20_single_shot_faucet import Faucet
from cic_registry import zero_address
from hexathon import strip_0x
# local import
from cic_eth.eth import RpcClient
@@ -20,7 +21,14 @@ from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.error import RoleMissingError
from cic_eth.error import (
RoleMissingError,
SignerError,
)
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndSignerTask,
)
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@@ -34,6 +42,8 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
uuid,
session=None,
):
"""Register an Ethereum account address with the on-chain account registry
@@ -56,7 +66,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
@@ -66,6 +76,8 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
uuid,
session=None,
):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
@@ -86,7 +98,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
@@ -101,11 +113,12 @@ def unpack_register(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != '0a3b0a4f':
raise ValueError('Invalid account index register data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
@@ -120,17 +133,19 @@ def unpack_gift(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != '63e4bff4':
raise ValueError('Invalid account index register data ({})'.format(f))
raise ValueError('Invalid gift data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
@celery_app.task()
# TODO: Separate out nonce initialization task
@celery_app.task(base=CriticalSQLAlchemyAndSignerTask)
def create(password, chain_str):
"""Creates and stores a new ethereum account in the keystore.
@@ -145,24 +160,24 @@ def create(password, chain_str):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
a = c.w3.eth.personal.new_account(password)
a = None
try:
a = c.w3.eth.personal.new_account(password)
except FileNotFoundError:
pass
if a == None:
raise SignerError('create account')
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
n = c.w3.eth.getTransactionCount(a, 'pending')
session = SessionBase.create_session()
o = session.query(Nonce).filter(Nonce.address_hex==a).first()
if o == None:
o = Nonce()
o.address_hex = a
o.nonce = n
session.add(o)
session.commit()
Nonce.init(a, session=session)
session.commit()
session.close()
return a
@celery_app.task(bind=True, throws=(RoleMissingError,))
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyAndSignerTask)
def register(self, account_address, chain_str, writer_address=None):
"""Creates a transaction to add the given address to the accounts index.
@@ -178,21 +193,24 @@ def register(self, account_address, chain_str, writer_address=None):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session()
if writer_address == None:
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER')
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session)
if writer_address == zero_address:
session.close()
raise RoleMissingError(account_address)
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c)
tx_add = txf.add(account_address, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data')
tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
session.close()
gas_budget = tx_add['gas'] * tx_add['gasPrice']
@@ -209,7 +227,7 @@ def register(self, account_address, chain_str, writer_address=None):
return account_address
@celery_app.task(bind=True)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def gift(self, account_address, chain_str):
"""Creates a transaction to invoke the faucet contract for the given address.
@@ -228,12 +246,14 @@ def gift(self, account_address, chain_str):
c = RpcClient(chain_spec, holder_address=account_address)
txf = AccountTxFactory(account_address, c)
tx_add = txf.gift(account_address, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data')
session = SessionBase.create_session()
tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
session.close()
gas_budget = tx_add['gas'] * tx_add['gasPrice']
logg.debug('register user tx {}'.format(tx_hash_hex))
logg.debug('gift user tx {}'.format(tx_hash_hex))
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
@@ -304,6 +324,8 @@ def cache_gift_data(
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_gift(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
@@ -312,9 +334,9 @@ def cache_gift_data(
zero_address,
0,
0,
session=session,
)
session = SessionBase.create_session()
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
@@ -322,7 +344,7 @@ def cache_gift_data(
return (tx_hash_hex, cache_id)
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_account_data(
tx_hash_hex,
tx_signed_raw_hex,
@@ -347,6 +369,7 @@ def cache_account_data(
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_register(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
@@ -355,9 +378,8 @@ def cache_account_data(
zero_address,
0,
0,
session=session,
)
session = SessionBase.create_session()
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id

View File

@@ -32,10 +32,10 @@ class TxFactory:
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self):
"""Returns the current cached nonce value, and increments it for next transaction.
def next_nonce(self, uuid, session=None):
"""Returns the current reserved nonce value, and increments it for next transaction.
:returns: Nonce
:rtype: number
"""
return self.nonce_oracle.next()
return self.nonce_oracle.next_by_task_uuid(uuid, session=session)

Some files were not shown because too many files have changed in this diff Show More