Compare commits

..

122 Commits

Author SHA1 Message Date
nolash
ed016ca35c Bump ussd cic-eth requirement 2021-04-24 10:38:41 +02:00
nolash
355840f316 Bump version 2021-04-24 10:36:52 +02:00
nolash
0319e0672e Merge remote-tracking branch 'origin/master' into lash/default-token 2021-04-24 08:18:49 +02:00
Louis Holbrook
cd0e702e3a Merge branch 'lash/custom-meta' into 'master'
Add custom meta tags

See merge request grassrootseconomics/cic-internal-integration!114
2021-04-24 06:14:25 +00:00
Louis Holbrook
cfab16f4a9 Add custom meta tags 2021-04-24 06:14:24 +00:00
nolash
3ed263a879 Add test for default token task 2021-04-23 23:31:41 +02:00
nolash
5d96ed5e24 Remove eth health modules config entries 2021-04-23 23:07:04 +02:00
nolash
489796abd8 Merge remote-tracking branch 'origin/master' into lash/default-token 2021-04-23 23:05:32 +02:00
Louis Holbrook
60fdb06034 Merge branch 'lash/emergency-shutdown' into 'master'
cic-eth: Add sanity checks for emergency shutdown / liveness tests

See merge request grassrootseconomics/cic-internal-integration!110
2021-04-23 21:02:52 +00:00
Louis Holbrook
3129a78e06 cic-eth: Add sanity checks for emergency shutdown / liveness tests 2021-04-23 21:02:51 +00:00
nolash
626b1c9a03 Add omitted files 2021-04-23 22:51:51 +02:00
nolash
41a3b2e060 Add default token setting to cic-eth with api 2021-04-23 11:22:12 +02:00
Louis Holbrook
6b6ec8659b Merge branch 'lash/simpler-token-selector' into 'master'
Simplify token selector

See merge request grassrootseconomics/cic-internal-integration!112
2021-04-23 08:17:59 +00:00
nolash
96e755b54d Simplify token selector 2021-04-22 11:58:39 +02:00
nolash
f38458ff4c Merge branch 'master' of gitlab.com:grassrootseconomics/cic-internal-integration 2021-04-22 11:57:15 +02:00
Louis Holbrook
660d524401 Merge branch 'lash/health-util' into 'master'
K8s health utilities for cic containers

See merge request grassrootseconomics/cic-internal-integration!108
2021-04-21 17:34:13 +00:00
Louis Holbrook
1bc7cde1f0 K8s health utilities for cic containers 2021-04-21 17:34:13 +00:00
Louis Holbrook
9c22ffca38 Merge branch 'lash/ussd-final-steps' into 'master'
USSD final steps

See merge request grassrootseconomics/cic-internal-integration!111
2021-04-21 17:25:57 +00:00
Louis Holbrook
39fe4a14ec USSD final steps 2021-04-21 17:25:57 +00:00
nolash
65250196cc cic-eth versionbump 2021-04-21 19:03:14 +02:00
Louis Holbrook
0123ce13ea Merge branch 'lash/settable-gas-price' into 'master'
Adapt deployment to Bloxberg

See merge request grassrootseconomics/cic-internal-integration!99
2021-04-21 05:46:42 +00:00
Louis Holbrook
03b3e8cd3f Adapt deployment to Bloxberg 2021-04-21 05:46:42 +00:00
Louis Holbrook
3ee84f780e Merge branch 'lash/cic-cache-syncer-backend-mixup' into 'master'
CIC-cache backend syncer mixup

See merge request grassrootseconomics/cic-internal-integration!106
2021-04-20 13:25:03 +00:00
Louis Holbrook
95269f69ed CIC-cache backend syncer mixup 2021-04-20 13:25:02 +00:00
621780e9b6 Update .cic-template.yml 2021-04-19 17:56:19 +00:00
eecdca1a55 Merge branch 'philip/ussd-db-fixes' into 'master'
Philip/ussd db fixes

See merge request grassrootseconomics/cic-internal-integration!107
2021-04-19 08:44:41 +00:00
6fef0ecec9 Philip/ussd db fixes 2021-04-19 08:44:40 +00:00
Louis Holbrook
6b89a2da89 Merge branch 'lash/chainlib-regression' into 'master'
Correct chainlib import paths

See merge request grassrootseconomics/cic-internal-integration!101
2021-04-16 21:44:14 +00:00
Louis Holbrook
254f2a266b Correct chainlib import paths 2021-04-16 21:44:14 +00:00
ba18914498 Merge branch 'philip/fix-filter-callbacks' into 'master'
Philip/fix filter callbacks

See merge request grassrootseconomics/cic-internal-integration!95
2021-04-16 20:24:07 +00:00
f410e8b7e3 Philip/fix filter callbacks 2021-04-16 20:24:07 +00:00
01454c9ac0 Merge branch 'add-chainsync-db' into 'master'
Add chainsync db

See merge request grassrootseconomics/cic-internal-integration!105
2021-04-15 21:22:07 +00:00
462d7046ed Add chainsync db 2021-04-15 21:22:07 +00:00
f91b491251 Merge branch 'ida/changes-to-args-commandline' into 'master'
Ida/changes to args commandline

See merge request grassrootseconomics/cic-internal-integration!104
2021-04-15 17:04:17 +00:00
0de79521dc Ida/changes to args commandline 2021-04-15 17:04:16 +00:00
nolash
22ec8e2e0e Update sql backend symbol name, deps 2021-04-15 18:04:47 +02:00
Louis Holbrook
a8529ae2ef Merge branch 'lash/improve-cic-cache-service' into 'master'
Iimprove cic cache service

See merge request grassrootseconomics/cic-internal-integration!100
2021-04-15 14:02:11 +00:00
Louis Holbrook
98ddf56a1d Iimprove cic cache service 2021-04-15 14:02:09 +00:00
bee602b16a Merge branch 'philip/leaner-metadata-handling' into 'master'
Philip/leaner metadata handling

See merge request grassrootseconomics/cic-internal-integration!94
2021-04-14 09:00:11 +00:00
c67274846f Philip/leaner metadata handling 2021-04-14 09:00:10 +00:00
Louis Holbrook
48570b2338 Merge branch 'lash/update-syncer-imports' into 'master'
Update syncer imports

See merge request grassrootseconomics/cic-internal-integration!97
2021-04-14 08:17:48 +00:00
Louis Holbrook
c80b8771b9 Update syncer imports 2021-04-14 08:17:47 +00:00
Louis Holbrook
6c6db7bc7b Merge branch 'lash/cache-tracker-history' into 'master'
Fix missing history syncer in cic-cache-tracker

See merge request grassrootseconomics/cic-internal-integration!96
2021-04-13 14:48:25 +00:00
nolash
bb941acd7e Fix missing history syncer in cic-cache-tracker 2021-04-13 15:31:40 +02:00
Louis Holbrook
7dee7de26e Merge branch 'lash/import-ussd' into 'master'
Implement migration script with ussd and notify

See merge request grassrootseconomics/cic-internal-integration!87
2021-04-09 13:00:15 +00:00
Louis Holbrook
7b16a36a62 Implement migration script with ussd and notify 2021-04-09 13:00:15 +00:00
Louis Holbrook
5a4e0b8eba Merge branch 'lash/external-notify-tasks' into 'master'
Discover queue for external tasks

See merge request grassrootseconomics/cic-internal-integration!89
2021-04-07 06:31:26 +00:00
Louis Holbrook
226699568f Discover queue for external tasks 2021-04-07 06:31:26 +00:00
Louis Holbrook
ec2b0e56e5 Merge branch 'lash/add-missing-state-file' into 'master'
Add stat file

See merge request grassrootseconomics/cic-internal-integration!92
2021-04-07 06:26:44 +00:00
nolash
6ffaca5207 Add stat file 2021-04-07 08:24:10 +02:00
Louis Holbrook
5c6375c9ec Merge branch 'lash/calculate-block-time' into 'master'
Add block time calc to retry, tracker

See merge request grassrootseconomics/cic-internal-integration!90
2021-04-06 19:11:43 +00:00
Louis Holbrook
99f55f01ed Add block time calc to retry, tracker 2021-04-06 19:11:42 +00:00
Louis Holbrook
086308fdb8 Merge branch 'lash/remove-stray-stat' into 'master'
Remove stray chainlib stat

See merge request grassrootseconomics/cic-internal-integration!88
2021-04-06 18:11:31 +00:00
nolash
f8f74a17f6 Remove stray chainlib stat 2021-04-06 20:08:18 +02:00
fd629cdc51 Merge branch 'philip/ussd-bug-fixes' into 'master'
Philip/ussd bug fixes

See merge request grassrootseconomics/cic-internal-integration!72
2021-04-06 17:53:38 +00:00
e9fb80ab78 Philip/ussd bug fixes 2021-04-06 17:53:38 +00:00
Louis Holbrook
7728f38f14 Merge branch 'lash/4k' into 'master'
Make 40k import pass

See merge request grassrootseconomics/cic-internal-integration!86
2021-04-06 15:14:04 +00:00
Louis Holbrook
a305aafc86 Make 40k import pass 2021-04-06 15:14:04 +00:00
Louis Holbrook
9e6bb2acb2 Merge branch 'lash/external-chain-queue' into 'master'
cic-eth: Rehabilitate retrier with chainqueue

See merge request grassrootseconomics/cic-internal-integration!85
2021-04-05 15:07:09 +00:00
Louis Holbrook
a7ab2e3f3f cic-eth: Rehabilitate retrier with chainqueue 2021-04-05 15:07:09 +00:00
Louis Holbrook
1f2fc3e952 Merge branch 'lash/external-chain-queue' into 'master'
Use external chain queue engine

See merge request grassrootseconomics/cic-internal-integration!84
2021-04-04 12:41:00 +00:00
Louis Holbrook
a9258c3085 Use external chain queue engine 2021-04-04 12:40:59 +00:00
Louis Holbrook
1a97f1e97d Merge branch 'lash/rehabilitate-cic-cache' into 'master'
Rehabilitate cic cache

See merge request grassrootseconomics/cic-internal-integration!82
2021-04-02 13:20:05 +00:00
Louis Holbrook
fc59e24c80 Rehabilitate cic cache 2021-04-02 13:20:05 +00:00
Louis Holbrook
68bdadcdf1 Merge branch 'lash/rehabilitate-retrier' into 'master'
Rehabilitate retrier

See merge request grassrootseconomics/cic-internal-integration!77
2021-04-02 13:16:27 +00:00
Louis Holbrook
810f9fe994 Rehabilitate retrier 2021-04-02 13:16:27 +00:00
Louis Holbrook
7762020186 Merge branch 'lash/clean-sovereign-imports' into 'master'
Remove custodial dependencies from sovereign import scripts

See merge request grassrootseconomics/cic-internal-integration!83
2021-04-01 20:55:39 +00:00
Louis Holbrook
462933d8ae Remove custodial dependencies from sovereign import scripts 2021-04-01 20:55:39 +00:00
Louis Holbrook
3f8d7fc10a Merge branch 'lash/sovereign-import' into 'master'
Sovereign import part II

See merge request grassrootseconomics/cic-internal-integration!81
2021-03-31 11:04:05 +00:00
Louis Holbrook
426e46c791 Sovereign import part II 2021-03-31 11:04:05 +00:00
Louis Holbrook
050da4ae36 Merge branch 'lash/sovereign-import' into 'master'
Sovereign import script

See merge request grassrootseconomics/cic-internal-integration!80
2021-03-31 10:52:27 +00:00
Louis Holbrook
1886e6743b Sovereign import script 2021-03-31 10:52:27 +00:00
5fa93633da add gitkeep back 2021-03-30 16:58:45 -07:00
Louis Holbrook
d822470937 Merge branch 'lash/connect-registry' into 'master'
Add declarator to tasker registry instantiation

See merge request grassrootseconomics/cic-internal-integration!79
2021-03-30 16:33:10 +00:00
Louis Holbrook
206a585fd1 Add declarator to tasker registry instantiation 2021-03-30 16:33:09 +00:00
Louis Holbrook
80d0bfe234 Merge branch 'lash/migration-verify-faucet' into 'master'
Add faucet and gas gift check to migration script verify

See merge request grassrootseconomics/cic-internal-integration!76
2021-03-29 19:29:29 +00:00
Louis Holbrook
21fd034478 Add faucet and gas gift check to migration script verify 2021-03-29 19:29:29 +00:00
Louis Holbrook
74457790a4 Merge branch 'lash/cic-ussd-cic-eth-upgrade' into 'master'
Bump cic-ussd to apply bumped cic-eth dep

See merge request grassrootseconomics/cic-internal-integration!75
2021-03-29 19:20:07 +00:00
Louis Holbrook
fd5e208acd Bump cic-ussd to apply bumped cic-eth dep 2021-03-29 19:20:06 +00:00
Louis Holbrook
2cb1d6d9fb Merge branch 'lash/add-omitted-tests' into 'master'
Add omitted tests from previous MR

See merge request grassrootseconomics/cic-internal-integration!74
2021-03-29 13:42:40 +00:00
nolash
5efc336962 Add omitted tests from previous MR 2021-03-29 15:35:51 +02:00
Louis Holbrook
cf761b3b31 Merge branch 'lash/migration-last-gasp' into 'master'
Replace web3 with chainlib

See merge request grassrootseconomics/cic-internal-integration!73
2021-03-29 13:27:54 +00:00
Louis Holbrook
b65ab8a0ca Rehabilitate transfer, approve
Signed-off-by: nolash <dev@holbrook.no>
2021-03-29 13:27:53 +00:00
299385f320 recursive own 2021-03-21 17:18:47 -07:00
bdd7ca14c2 contract migration permissions 2021-03-21 11:32:53 -07:00
88ace15d6b Merge branch 'lash/new-contract-migration' into 'master'
Fix breaking seed script

See merge request grassrootseconomics/cic-internal-integration!71
2021-03-20 06:04:02 +00:00
Louis Holbrook
e3ab8bcd82 Fix breaking seed script 2021-03-20 06:04:02 +00:00
992c5f3c00 Merge branch 'bvander/contract-migration-refactor' into 'master'
contract migration refactor

See merge request grassrootseconomics/cic-internal-integration!70
2021-03-19 14:19:40 +00:00
e4d1f5b65c contract migration refactor 2021-03-19 14:19:40 +00:00
7fe5b6bea3 add libpq to runtime 2021-03-17 07:30:34 -07:00
fc27dd6826 Merge branch 'bvander/more-contract-migration-permissions' into 'master'
add permissions and move some pip installs

See merge request grassrootseconomics/cic-internal-integration!69
2021-03-16 05:34:20 +00:00
75262dae5d add permissions and move some pip installs 2021-03-16 05:34:19 +00:00
c59f9da0fc try the multiple dest flags... 2021-03-15 14:31:48 +00:00
20a26045eb Update .cic-template.yml 2021-03-15 14:21:39 +00:00
09a79e10d5 Merge branch 'bvander/fix-contract-migration-permissions' into 'master'
fix home dir permissions

See merge request grassrootseconomics/cic-internal-integration!68
2021-03-15 14:17:09 +00:00
c0dff41b3c fix home dir permissions 2021-03-15 07:10:08 -07:00
18be4c48a7 Merge branch 'fix-cache-tasker' into 'master'
adding cic cache details

See merge request grassrootseconomics/cic-internal-integration!65
2021-03-15 13:22:39 +00:00
432dbe9fee adding cic cache details 2021-03-15 13:22:39 +00:00
93338aebea Merge branch 'bvander/add-cache-dbinit' into 'master'
cic cache db fixes

See merge request grassrootseconomics/cic-internal-integration!64
2021-03-15 13:20:18 +00:00
097a80e8de cic cache db fixes 2021-03-15 13:20:18 +00:00
bd4e5b0a40 Merge branch 'contract-migration-file-issue' into 'master'
Contract migration file issue

See merge request grassrootseconomics/cic-internal-integration!67
2021-03-15 03:27:33 +00:00
4927d92215 seperate module install 2021-03-14 20:21:07 -07:00
650472252d Merge branch 'philip/ussd_session_resumption' into 'master'
Add barebone session resumption feature

See merge request grassrootseconomics/cic-internal-integration!60
2021-03-14 18:17:51 +00:00
746196d2b1 Merge branch 'drop-bancor-contracts' into 'master'
contract migration image build improvements

See merge request grassrootseconomics/cic-internal-integration!66
2021-03-13 18:50:36 +00:00
842cbadf00 contract migration image build improvements 2021-03-13 18:50:36 +00:00
3661e48fd1 Merge branch 'cic-ussd-image-normalize' into 'master'
use the ubuntu slim image

See merge request grassrootseconomics/cic-internal-integration!63
2021-03-12 16:48:40 +00:00
f136504988 use the ubuntu slim image 2021-03-12 16:48:39 +00:00
f6a7956fdf Update .cic-template.yml 2021-03-10 03:53:58 +00:00
562292bd01 Merge branch 'kaniko-builds' into 'master'
Update ci_templates/.cic-template.yml

See merge request grassrootseconomics/cic-internal-integration!61
2021-03-10 03:48:17 +00:00
3cdf7b9965 Update ci_templates/.cic-template.yml 2021-03-10 03:48:17 +00:00
16d88d389b move envlist to dockercontainer 2021-03-09 16:54:33 -08:00
29da44bb9f Add barebone session resumption feature 2021-03-09 19:05:01 +03:00
15445b8d0f Merge branch 'cic-ussd-reqfix' into 'master'
cic types

See merge request grassrootseconomics/cic-internal-integration!59
2021-03-07 20:55:16 +00:00
bb90ceea0b cic types 2021-03-07 12:54:41 -08:00
8904e2abb1 Merge branch 'cic-ussd-req' into 'master'
revert the new requirements for ussd

See merge request grassrootseconomics/cic-internal-integration!58
2021-03-07 20:27:03 +00:00
94d3e61d0c revert the new requirements for ussd 2021-03-07 12:25:36 -08:00
nolash
fc08f3d17a bump cic base for cic ussd 2021-03-07 20:35:26 +01:00
Louis Holbrook
8da5219290 Merge branch 'lash/cli-rehabilitations' into 'master'
Rehabilitate CLI and API after nonce changes

See merge request grassrootseconomics/cic-internal-integration!57
2021-03-07 18:01:44 +00:00
Louis Holbrook
5f01135b04 Rehabilitate CLI and API after nonce changes 2021-03-07 18:01:44 +00:00
Louis Holbrook
543c6249b9 Merge branch 'lash/retry-on-signer-error' into 'master'
Retry on signer error

See merge request grassrootseconomics/cic-internal-integration!55
2021-03-07 13:51:59 +00:00
Louis Holbrook
db4f8f8955 Retry on signer error 2021-03-07 13:51:59 +00:00
Louis Holbrook
0c45e12ce1 Merge branch 'lash/ussd-cli' into 'master'
Add ussd cli

See merge request grassrootseconomics/cic-internal-integration!54
2021-03-06 22:27:40 +00:00
443 changed files with 11230 additions and 816205 deletions

8
.gitignore vendored
View File

@@ -1,2 +1,10 @@
service-configs/*
!service-configs/.gitkeep
**/node_modules/
__pycache__
*.pyc
*.o
gmon.out
*.egg-info
dist/
build/

View File

@@ -1,5 +1,5 @@
[database]
NAME=cic-eth
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost

View File

@@ -36,7 +36,7 @@ script_location = .
# output_encoding = utf-8
#sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-cache
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic_cache
[post_write_hooks]

View File

@@ -1,8 +1,8 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
Revision ID: 6604de4203e2
Revises: 63b629f14a85
Create Date: 2021-04-01 08:10:29.156243
"""
from alembic import op
@@ -14,15 +14,15 @@ from chainsyncer.db.migrations.sqlalchemy import (
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
revision = '6604de4203e2'
down_revision = '63b629f14a85'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -0,0 +1 @@
from .erc20 import *

View File

@@ -0,0 +1,2 @@
class SyncFilter:
pass

View File

@@ -0,0 +1,73 @@
# standard imports
import logging
# external imports
from chainlib.eth.erc20 import ERC20
from chainlib.eth.address import (
to_checksum_address,
)
from chainlib.eth.error import RequestMismatchException
from chainlib.status import Status
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import (
NotAContractError,
ContractMismatchError,
)
# local imports
from .base import SyncFilter
from cic_cache import db as cic_cache_db
logg = logging.getLogger().getChild(__name__)
class ERC20TransferFilter(SyncFilter):
def __init__(self, chain_spec):
self.chain_spec = chain_spec
# TODO: Verify token in declarator / token index
def filter(self, conn, block, tx, db_session=None):
logg.debug('filter {} {}'.format(block, tx))
token = None
try:
token = ERC20Token(self.chain_spec, conn, tx.inputs[0])
except NotAContractError:
logg.debug('not a contract {}'.format(tx.inputs[0]))
return False
except ContractMismatchError:
logg.debug('not an erc20 token {}'.format(tx.inputs[0]))
return False
transfer_data = None
try:
transfer_data = ERC20.parse_transfer_request(tx.payload)
except RequestMismatchException:
logg.debug('erc20 match but not a transfer, skipping')
return False
token_sender = tx.outputs[0]
token_recipient = transfer_data[0]
token_value = transfer_data[1]
logg.debug('matched erc20 token transfer {} ({}) to {} value {}'.format(token.name, token.address, transfer_data[0], transfer_data[1]))
cic_cache_db.add_transaction(
db_session,
tx.hash,
block.number,
tx.index,
to_checksum_address(token_sender),
to_checksum_address(token_recipient),
token.address,
token.address,
token_value,
token_value,
tx.status == Status.SUCCESS,
block.timestamp,
)
#db_session.flush()
db_session.commit()
return True

View File

@@ -0,0 +1,112 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# third-party imports
import confini
import celery
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.connection import RPCConnection
from chainlib.eth.block import (
block_latest,
)
from hexathon import (
strip_0x,
)
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_cache.db import dsn_from_config
from cic_cache.runnable.daemons.filters import (
ERC20TransferFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
cic_base.config.log(config)
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def main():
# Connect to blockchain with chainlib
rpc = RPCConnection.connect(chain_spec, 'default')
o = block_latest()
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('starting at block {}'.format(block_offset))
syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
erc20_transfer_filter = ERC20TransferFilter(chain_spec)
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
syncer.add_filter(erc20_transfer_filter)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
if __name__ == '__main__':
main()

View File

@@ -1,339 +0,0 @@
# standard imports
import sys
import os
import argparse
import logging
import time
import enum
import re
# third-party imports
import confini
from cic_registry import CICRegistry
from cic_registry.chain import (
ChainRegistry,
ChainSpec,
)
#from cic_registry.bancor import BancorRegistryClient
from cic_registry.token import Token
from cic_registry.error import (
UnknownContractError,
UnknownDeclarationError,
)
from cic_registry.declaration import to_token_declaration
from web3.exceptions import BlockNotFound, TransactionNotFound
from websockets.exceptions import ConnectionClosedError
from requests.exceptions import ConnectionError
import web3
from web3 import HTTPProvider, WebsocketProvider
# local imports
from cic_cache import db
from cic_cache.db.models.base import SessionBase
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
log_topics = {
'transfer': '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
'convert': '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095',
'accountregistry_add': '0a3b0a4f4c6e53dce3dbcad5614cb2ba3a0fa7326d03c5d64b4fa2d565492737',
}
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--trust-address', default=[], type=str, dest='trust_address', action='append', help='Set address as trust')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_TRUST_ADDRESS': ",".join(getattr(args, 'trust_address', [])),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
# connect to database
dsn = db.dsn_from_config(config)
SessionBase.connect(dsn)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
class RunStateEnum(enum.IntEnum):
INIT = 0
RUN = 1
TERMINATE = 9
def rubberstamp(src):
return True
class Tracker:
def __init__(self, chain_spec, trusts=[]):
self.block_height = 0
self.tx_height = 0
self.state = RunStateEnum.INIT
self.declarator_cache = {}
self.convert_enabled = False
self.trusts = trusts
self.chain_spec = chain_spec
self.declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
def __process_tx(self, w3, session, t, r, l, b):
token_value = int(l.data, 16)
token_sender = l.topics[1][-20:].hex()
token_recipient = l.topics[2][-20:].hex()
#ts = ContractRegistry.get_address(t.address)
ts = CICRegistry.get_address(self.chain_spec, t.address())
logg.info('add token transfer {} value {} from {} to {}'.format(
ts.symbol(),
token_value,
token_sender,
token_recipient,
)
)
db.add_transaction(
session,
r.transactionHash.hex(),
r.blockNumber,
r.transactionIndex,
w3.toChecksumAddress(token_sender),
w3.toChecksumAddress(token_recipient),
t.address(),
t.address(),
token_value,
token_value,
r.status == 1,
b.timestamp,
)
session.flush()
# TODO: simplify/ split up and/or comment, function is too long
def __process_convert(self, w3, session, t, r, l, b):
logg.warning('conversions are deactivated')
return
# token_source = l.topics[2][-20:].hex()
# token_source = w3.toChecksumAddress(token_source)
# token_destination = l.topics[3][-20:].hex()
# token_destination = w3.toChecksumAddress(token_destination)
# data_noox = l.data[2:]
# d = data_noox[:64]
# token_from_value = int(d, 16)
# d = data_noox[64:128]
# token_to_value = int(d, 16)
# token_trader = '0x' + data_noox[192-40:]
#
# #ts = ContractRegistry.get_address(token_source)
# ts = CICRegistry.get_address(CICRegistry.bancor_chain_spec, t.address())
# #if ts == None:
# # ts = ContractRegistry.reserves[token_source]
# td = ContractRegistry.get_address(token_destination)
# #if td == None:
# # td = ContractRegistry.reserves[token_source]
# logg.info('add token convert {} -> {} value {} -> {} trader {}'.format(
# ts.symbol(),
# td.symbol(),
# token_from_value,
# token_to_value,
# token_trader,
# )
# )
#
# db.add_transaction(
# session,
# r.transactionHash.hex(),
# r.blockNumber,
# r.transactionIndex,
# w3.toChecksumAddress(token_trader),
# w3.toChecksumAddress(token_trader),
# token_source,
# token_destination,
# r.status == 1,
# b.timestamp,
# )
# session.flush()
def check_token(self, address):
t = None
try:
t = CICRegistry.get_address(CICRegistry.default_chain_spec, address)
return t
except UnknownContractError:
logg.debug('contract {} not in registry'.format(address))
# If nothing was returned, we look up the token in the declarator
for trust in self.trusts:
logg.debug('look up declaration for contract {} with trust {}'.format(address, trust))
fn = self.declarator.function('declaration')
# TODO: cache trust in LRUcache
declaration_array = fn(trust, address).call()
try:
declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp])
logg.debug('found declaration for token {} from trust address {}'.format(address, trust))
except UnknownDeclarationError:
continue
try:
c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address)
t = CICRegistry.add_token(self.chain_spec, c)
break
except ValueError:
logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address))
return t
# TODO use input data instead of logs
def process(self, w3, session, block):
#self.refresh_registry(w3)
tx_count = w3.eth.getBlockTransactionCount(block.hash)
b = w3.eth.getBlock(block.hash)
for i in range(self.tx_height, tx_count):
tx = w3.eth.getTransactionByBlock(block.hash, i)
if tx.to == None:
logg.debug('block {} tx {} is contract creation tx, skipping'.format(block.number, i))
continue
if len(w3.eth.getCode(tx.to)) == 0:
logg.debug('block {} tx {} not a contract tx, skipping'.format(block.number, i))
continue
t = self.check_token(tx.to)
if t != None and isinstance(t, Token):
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['transfer']:
self.__process_tx(w3, session, t, r, l, b)
# TODO: cache contracts in LRUcache
elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address:
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['convert']:
self.__process_convert(w3, session, t, r, l, b)
session.execute("UPDATE tx_sync SET tx = '{}'".format(tx.hash.hex()))
session.commit()
self.tx_height += 1
def __get_next_retry(self, backoff=False):
return 1
def loop(self):
logg.info('starting at block {} tx index {}'.format(self.block_height, self.tx_height))
self.state = RunStateEnum.RUN
while self.state == RunStateEnum.RUN:
(provider, w3) = web3_constructor()
session = SessionBase.create_session()
try:
block = w3.eth.getBlock(self.block_height)
self.process(w3, session, block)
self.block_height += 1
self.tx_height = 0
except BlockNotFound as e:
logg.debug('no block {} yet, zZzZ...'.format(self.block_height))
time.sleep(self.__get_next_retry())
except ConnectionClosedError as e:
logg.info('connection gone, retrying')
time.sleep(self.__get_next_retry(True))
except OSError as e:
logg.error('cannot connect {}'.format(e))
time.sleep(self.__get_next_retry(True))
except Exception as e:
session.close()
raise(e)
session.close()
def load(self, w3):
session = SessionBase.create_session()
r = session.execute('SELECT tx FROM tx_sync').first()
if r != None:
if r[0] == '0x{0:0{1}X}'.format(0, 64):
logg.debug('last tx was zero-address, starting from scratch')
return
t = w3.eth.getTransaction(r[0])
self.block_height = t.blockNumber
self.tx_height = t.transactionIndex+1
c = w3.eth.getBlockTransactionCount(t.blockHash.hex())
logg.debug('last tx processed {} index {} (max index {})'.format(t.blockNumber, t.transactionIndex, c-1))
if c == self.tx_height:
self.block_height += 1
self.tx_height = 0
session.close()
(provider, w3) = web3_constructor()
trust = config.get('CIC_TRUST_ADDRESS', "").split(",")
chain_spec = args.i
try:
w3.eth.chainId
except Exception as e:
logg.exception(e)
sys.stderr.write('cannot connect to evm node\n')
sys.exit(1)
def main():
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry)
t = Tracker(chain_spec, trust)
t.load(w3)
t.loop()
if __name__ == '__main__':
main()

View File

@@ -5,7 +5,7 @@ version = (
0,
2,
0,
'alpha.1',
'alpha.2',
)
version_object = semver.VersionInfo(

View File

@@ -0,0 +1,4 @@
[cic]
chain_spec =
registry_address =
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=1
DEBUG=0

View File

@@ -1,3 +1,2 @@
[eth]
provider = ws://localhost:63546
chain_id = 8996

View File

@@ -0,0 +1,2 @@
[syncer]
loop_interval = 1

View File

@@ -1,7 +1,2 @@
[eth]
provider = ws://localhost:8545
#ttp_provider = http://localhost:8545
#provider = http://localhost:8545
gas_provider_address =
#chain_id =
abi_dir = /usr/local/share/cic/solidity/abi

View File

@@ -0,0 +1,2 @@
[syncer]
loop_interval = 5

View File

@@ -1,5 +0,0 @@
CREATE DATABASE "cic-cache";
CREATE DATABASE "cic-eth";
CREATE DATABASE "cic-notify";
CREATE DATABASE "cic-meta";
CREATE DATABASE "cic-signer";

View File

@@ -17,8 +17,7 @@ RUN apt-get update && \
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
COPY $root_requirement_file .
RUN pip install -r $root_requirement_file $pip_extra_index_url_flag
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a76
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
@@ -48,6 +47,9 @@ RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server

View File

@@ -0,0 +1,6 @@
#!/bin/bash
set -e
>&2 echo executing database migration
python scripts/migrate.py -c /usr/local/etc/cic-cache --migrations-dir /usr/local/share/cic-cache/alembic -vv
set +e

View File

@@ -0,0 +1,10 @@
#!/bin/bash
. ./db.sh
if [ $? -ne "0" ]; then
>&2 echo db migrate fail
exit 1
fi
/usr/local/bin/cic-cache-trackerd $@

View File

@@ -1,10 +1,12 @@
cic-base~=0.1.2a77
alembic==1.4.2
confini~=0.3.6b2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-registry~=0.5.3a4
cic-eth-registry~=0.5.4a16
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.2a2

View File

@@ -25,13 +25,17 @@ licence_files =
python_requires = >= 3.6
packages =
cic_cache
cic_cache.tasks
cic_cache.db
cic_cache.db.models
cic_cache.runnable
cic_cache.runnable.daemons
cic_cache.runnable.daemons.filters
scripts =
./scripts/migrate.py
[options.entry_points]
console_scripts =
cic-cache-tracker = cic_cache.runnable.tracker:main
cic-cache-server = cic_cache.runnable.server:main
cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main
cic-cache-serverd = cic_cache.runnable.daemons.server:main
cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main

View File

@@ -4,19 +4,23 @@ import logging
# third-party imports
import celery
from cic_registry import zero_address
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
# local imports
from cic_eth.db.enum import LockEnum
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.lock import Lock
from cic_eth.task import (
CriticalSQLAlchemyTask,
)
from cic_eth.error import LockedError
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task()
def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_hash=None):
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
:param chain_str: Chain spec string representation
@@ -28,13 +32,16 @@ def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_
:returns: New lock state for address
:rtype: number
"""
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash)
logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@celery_app.task()
def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks
:param chain_str: Chain spec string representation
@@ -46,13 +53,16 @@ def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
:returns: New lock state for address
:rtype: number
"""
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, flags, address=address)
logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@celery_app.task()
def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
"""Task wrapper to set send lock
:param chain_str: Chain spec string representation
@@ -62,13 +72,14 @@ def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash)
logg.debug('Send locked for {}, flag now {}'.format(address, r))
return chained_input
@celery_app.task()
def unlock_send(chained_input, chain_str, address=zero_address):
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
"""Task wrapper to reset send lock
:param chain_str: Chain spec string representation
@@ -78,13 +89,14 @@ def unlock_send(chained_input, chain_str, address=zero_address):
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.SEND, address=address)
logg.debug('Send unlocked for {}, flag now {}'.format(address, r))
return chained_input
@celery_app.task()
def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
"""Task wrapper to set queue direct lock
:param chain_str: Chain spec string representation
@@ -94,13 +106,14 @@ def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash)
logg.debug('Queue direct locked for {}, flag now {}'.format(address, r))
return chained_input
@celery_app.task()
def unlock_queue(chained_input, chain_str, address=zero_address):
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
"""Task wrapper to reset queue direct lock
:param chain_str: Chain spec string representation
@@ -110,18 +123,25 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
:returns: New lock state for address
:rtype: number
"""
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.QUEUE, address=address)
logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r))
return chained_input
@celery_app.task()
def check_lock(chained_input, chain_str, lock_flags, address=None):
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS, session=session)
if address != None:
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
if r > 0:
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
session.close()
raise LockedError(r)
session.flush()
session.close()
return chained_input

View File

@@ -1,25 +1,26 @@
# standard imports
import logging
# third-party imports
# external imports
import celery
from cic_registry.chain import ChainSpec
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from chainqueue.query import get_tx
from chainqueue.state import set_cancel
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.admin.ctrl import lock_send
from cic_eth.admin.ctrl import unlock_send
from cic_eth.admin.ctrl import lock_queue
from cic_eth.admin.ctrl import unlock_queue
from cic_eth.queue.tx import get_tx
from cic_eth.queue.tx import set_cancel
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import sign_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.admin.ctrl import (
lock_send,
unlock_send,
lock_queue,
unlock_queue,
)
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
celery_app = celery.current_app
logg = logging.getLogger()
@@ -45,8 +46,8 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_brief = get_tx(tx_hash_orig_hex)
tx_raw = bytes.fromhex(tx_brief['signed_tx'][2:])
tx = unpack_signed_raw_tx(tx_raw, chain_spec.chain_id())
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx'][2:]))
tx = unpack(tx_raw, chain_spec)
nonce = tx_brief['nonce']
address = tx['from']
@@ -66,8 +67,8 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
tx_hashes = []
txs = []
for otx in otxs:
tx_raw = bytes.fromhex(otx.signed_tx[2:])
tx_new = unpack_signed_raw_tx(tx_raw, chain_spec.chain_id())
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce']

View File

@@ -0,0 +1,19 @@
# standard imports
import logging
# external imports
import celery
# local imports
from cic_eth.task import BaseTask
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(bind=True, base=BaseTask)
def default_token(self):
return {
'symbol': self.default_token_symbol,
'address': self.default_token_address,
}

View File

@@ -2,36 +2,49 @@
import logging
import sys
# third-party imports
# external imports
import celery
import web3
from cic_registry import zero_address
from cic_registry import zero_content
from cic_registry import CICRegistry
from crypto_dev_signer.eth.web3ext import Web3 as Web3Ext
from cic_registry.error import UnknownContractError
from chainlib.eth.constant import (
ZERO_ADDRESS,
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.eth.address import to_checksum_address
from chainlib.eth.contract import code
from chainlib.eth.tx import (
transaction,
receipt,
unpack,
)
from chainlib.hash import keccak256_hex_to_hex
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.gas import balance
from chainqueue.db.enum import (
StatusEnum,
StatusBits,
is_alive,
is_error_status,
status_str,
)
from chainqueue.error import TxStateChangeError
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.enum import (
StatusEnum,
is_alive,
)
from cic_eth.error import InitializationError
from cic_eth.db.error import TxStateChangeError
from cic_eth.eth.rpc import RpcClient
from cic_eth.queue.tx import get_tx
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.queue.query import get_tx
app = celery.current_app
#logg = logging.getLogger(__file__)
logg = logging.getLogger()
local_fail = StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.UNKNOWN_ERROR
class AdminApi:
"""Provides an interface to view and manipulate existing transaction tasks and system runtime settings.
@@ -41,19 +54,20 @@ class AdminApi:
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, rpc_client, queue='cic-eth'):
self.rpc_client = rpc_client
self.w3 = rpc_client.w3
def __init__(self, rpc, queue='cic-eth', call_address=ZERO_ADDRESS):
self.rpc = rpc
self.queue = queue
self.call_address = call_address
def unlock(self, chain_spec, address, flags=None):
s_unlock = celery.signature(
'cic_eth.admin.ctrl.unlock',
[
str(chain_spec),
flags,
None,
chain_spec.asdict(),
address,
flags,
],
queue=self.queue,
)
@@ -64,9 +78,10 @@ class AdminApi:
s_lock = celery.signature(
'cic_eth.admin.ctrl.lock',
[
str(chain_spec),
flags,
None,
chain_spec.asdict(),
address,
flags,
],
queue=self.queue,
)
@@ -75,14 +90,14 @@ class AdminApi:
def get_lock(self):
s_lock = celery.signature(
'cic_eth.queue.tx.get_lock',
'cic_eth.queue.lock.get_lock',
[],
queue=self.queue,
)
return s_lock.apply_async().get()
return s_lock.apply_async()
def tag_account(self, tag, address_hex):
def tag_account(self, tag, address_hex, chain_spec):
"""Persistently associate an address with a plaintext tag.
Some tags are known by the system and is used to resolve addresses to use for certain transactions.
@@ -93,33 +108,37 @@ class AdminApi:
:type address_hex: str, 0x-hex
:raises ValueError: Invalid checksum address
"""
if not web3.Web3.isChecksumAddress(address_hex):
raise ValueError('invalid address')
session = SessionBase.create_session()
role = AccountRole.set(tag, address_hex)
session.add(role)
session.commit()
session.close()
s_tag = celery.signature(
'cic_eth.eth.account.set_role',
[
tag,
address_hex,
chain_spec.asdict(),
],
queue=self.queue,
)
return s_tag.apply_async()
def have_account(self, address_hex, chain_str):
def have_account(self, address_hex, chain_spec):
s_have = celery.signature(
'cic_eth.eth.account.have',
[
address_hex,
chain_str,
chain_spec.asdict(),
],
queue=self.queue,
)
t = s_have.apply_async()
return t.get()
return s_have.apply_async()
def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False):
def resend(self, tx_hash_hex, chain_spec, in_place=True, unlock=False):
logg.debug('resend {}'.format(tx_hash_hex))
s_get_tx_cache = celery.signature(
'cic_eth.queue.tx.get_tx_cache',
'cic_eth.queue.query.get_tx_cache',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue,
@@ -135,9 +154,9 @@ class AdminApi:
raise NotImplementedError('resend as new not yet implemented')
s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
'cic_eth.eth.gas.resend_with_higher_gas',
[
chain_str,
chain_spec.asdict(),
None,
1.01,
],
@@ -145,7 +164,7 @@ class AdminApi:
)
s_manual = celery.signature(
'cic_eth.queue.tx.set_manual',
'cic_eth.queue.state.set_manual',
[
tx_hash_hex,
],
@@ -157,7 +176,7 @@ class AdminApi:
s_gas = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
chain_str,
chain_spec.asdict(),
tx_dict['sender'],
],
queue=self.queue,
@@ -168,8 +187,9 @@ class AdminApi:
def check_nonce(self, address):
s = celery.signature(
'cic_eth.queue.tx.get_account_tx',
'cic_eth.queue.query.get_account_tx',
[
chain_spec.asdict(),
address,
True,
False,
@@ -181,28 +201,37 @@ class AdminApi:
blocking_tx = None
blocking_nonce = None
nonce_otx = 0
last_nonce = -1
for k in txs.keys():
s_get_tx = celery.signature(
'cic_eth.queue.tx.get_tx',
'cic_eth.queue.query.get_tx',
[
chain_spec.asdict(),
k,
],
queue=self.queue,
)
tx = s_get_tx.apply_async().get()
#tx = get_tx(k)
logg.debug('checking nonce {}'.format(tx['nonce']))
if tx['status'] in [StatusEnum.REJECTED, StatusEnum.FUBAR]:
blocking_tx = k
blocking_nonce = tx['nonce']
logg.debug('checking nonce {} (previous {})'.format(tx['nonce'], last_nonce))
nonce_otx = tx['nonce']
if not is_alive(tx['status']) and tx['status'] & local_fail > 0:
logg.info('permanently errored {} nonce {} status {}'.format(k, nonce_otx, status_str(tx['status'])))
blocking_tx = k
blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1:
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from']))
blocking_tx = k
blocking_nonce = nonce_otx
break
last_nonce = nonce_otx
#nonce_cache = Nonce.get(address)
nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
#nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
return {
'nonce': {
'network': nonce_w3,
#'network': nonce_cache,
'queue': nonce_otx,
#'cache': nonce_cache,
'blocking': blocking_nonce,
@@ -213,10 +242,11 @@ class AdminApi:
}
def fix_nonce(self, address, nonce):
def fix_nonce(self, address, nonce, chain_spec):
s = celery.signature(
'cic_eth.queue.tx.get_account_tx',
'cic_eth.queue.query.get_account_tx',
[
chain_spec.asdict(),
address,
True,
False,
@@ -234,7 +264,7 @@ class AdminApi:
s_nonce = celery.signature(
'cic_eth.admin.nonce.shift_nonce',
[
str(self.rpc_client.chain_spec),
self.rpc.chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue
@@ -242,33 +272,33 @@ class AdminApi:
return s_nonce.apply_async()
# TODO: this is a stub, complete all checks
def ready(self):
"""Checks whether all required initializations have been performed.
:raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met.
:raises KeyError: An address provided for initialization is not known by the keystore.
"""
addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
if addr == zero_address:
raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS')
self.w3.eth.sign(addr, text='666f6f')
# # TODO: this is a stub, complete all checks
# def ready(self):
# """Checks whether all required initializations have been performed.
#
# :raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met.
# :raises KeyError: An address provided for initialization is not known by the keystore.
# """
# addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
# if addr == ZERO_ADDRESS:
# raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS')
#
# self.w3.eth.sign(addr, text='666f6f')
def account(self, chain_spec, address, cols=['tx_hash', 'sender', 'recipient', 'nonce', 'block', 'tx_index', 'status', 'network_status', 'date_created'], include_sender=True, include_recipient=True):
def account(self, chain_spec, address, include_sender=True, include_recipient=True, renderer=None, w=sys.stdout):
"""Lists locally originated transactions for the given Ethereum address.
Performs a synchronous call to the Celery task responsible for performing the query.
:param address: Ethereum address to return transactions for
:type address: str, 0x-hex
:param cols: Data columns to include
:type cols: list of str
"""
last_nonce = -1
s = celery.signature(
'cic_eth.queue.tx.get_account_tx',
'cic_eth.queue.query.get_account_tx',
[
chain_spec.asdict(),
address,
],
queue=self.queue,
@@ -277,33 +307,48 @@ class AdminApi:
tx_dict_list = []
for tx_hash in txs.keys():
errors = []
s = celery.signature(
'cic_eth.queue.tx.get_tx_cache',
[tx_hash],
'cic_eth.queue.query.get_tx_cache',
[
chain_spec.asdict(),
tx_hash,
],
queue=self.queue,
)
tx_dict = s.apply_async().get()
if tx_dict['sender'] == address and not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
continue
if tx_dict['sender'] == address:
if tx_dict['nonce'] - last_nonce > 1:
logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash))
errors.append('nonce')
elif tx_dict['nonce'] == last_nonce:
logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash))
last_nonce = tx_dict['nonce']
if not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
continue
elif tx_dict['recipient'] == address and not include_recipient:
logg.debug('skipping recipient tx {}'.format(tx_dict['tx_hash']))
continue
logg.debug(tx_dict)
o = {
'nonce': tx_dict['nonce'],
'tx_hash': tx_dict['tx_hash'],
'status': tx_dict['status'],
'date_updated': tx_dict['date_updated'],
'errors': errors,
}
tx_dict_list.append(o)
if renderer != None:
r = renderer(o)
w.write(r + '\n')
else:
tx_dict_list.append(o)
return tx_dict_list
# TODO: Add exception upon non-existent tx aswell as invalid tx data to docstring
def tx(self, chain_spec, tx_hash=None, tx_raw=None):
def tx(self, chain_spec, tx_hash=None, tx_raw=None, registry=None, renderer=None, w=sys.stdout):
"""Output local and network details about a given transaction with local origin.
If the transaction hash is given, the raw trasnaction data will be retrieved from the local transaction queue backend. Otherwise the raw transaction data must be provided directly. Only one of transaction hash and transaction data can be passed.
@@ -324,42 +369,51 @@ class AdminApi:
ValueError('Specify only one of hash or raw tx')
if tx_raw != None:
tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
tx_hash = add_0x(keccak256_hex_to_hex(tx_raw))
#tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
s = celery.signature(
'cic_eth.queue.tx.get_tx_cache',
[tx_hash],
'cic_eth.queue.query.get_tx_cache',
[
chain_spec.asdict(),
tx_hash,
],
queue=self.queue,
)
tx = s.apply_async().get()
t = s.apply_async()
tx = t.get()
source_token = None
if tx['source_token'] != zero_address:
if tx['source_token'] != ZERO_ADDRESS:
try:
source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
source_token = registry.by_address(tx['source_token'])
#source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
except UnknownContractError:
source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
source_token = CICRegistry.add_token(chain_spec, source_token_contract)
#source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#source_token = CICRegistry.add_token(chain_spec, source_token_contract)
logg.warning('unknown source token contract {}'.format(tx['source_token']))
destination_token = None
if tx['source_token'] != zero_address:
if tx['source_token'] != ZERO_ADDRESS:
try:
destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
#destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
destination_token = registry.by_address(tx['destination_token'])
except UnknownContractError:
destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
#destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
tx['sender_description'] = 'Custodial account'
tx['recipient_description'] = 'Custodial account'
c = RpcClient(chain_spec)
if len(c.w3.eth.getCode(tx['sender'])) > 0:
o = code(tx['sender'])
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
try:
sender_contract = CICRegistry.get_address(chain_spec, tx['sender'])
tx['sender_description'] = 'Contract {}'.format(sender_contract.identifier())
#sender_contract = CICRegistry.get_address(chain_spec, tx['sender'])
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract at {}'.format(tx['sender']) #sender_contract)
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
@@ -369,7 +423,7 @@ class AdminApi:
'cic_eth.eth.account.have',
[
tx['sender'],
str(chain_spec),
chain_spec.asdict(),
],
queue=self.queue,
)
@@ -382,7 +436,7 @@ class AdminApi:
'cic_eth.eth.account.role',
[
tx['sender'],
str(chain_spec),
chain_spec.asdict(),
],
queue=self.queue,
)
@@ -391,11 +445,13 @@ class AdminApi:
if role != None:
tx['sender_description'] = role
if len(c.w3.eth.getCode(tx['recipient'])) > 0:
o = code(tx['recipient'])
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
try:
recipient_contract = CICRegistry.get_address(chain_spec, tx['recipient'])
tx['recipient_description'] = 'Contract {}'.format(recipient_contract.identifier())
#recipient_contract = CICRegistry.by_address(tx['recipient'])
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) #recipient_contract)
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:
@@ -405,7 +461,7 @@ class AdminApi:
'cic_eth.eth.account.have',
[
tx['recipient'],
str(chain_spec),
chain_spec.asdict(),
],
queue=self.queue,
)
@@ -418,7 +474,7 @@ class AdminApi:
'cic_eth.eth.account.role',
[
tx['recipient'],
str(chain_spec),
chain_spec.asdict(),
],
queue=self.queue,
)
@@ -435,38 +491,52 @@ class AdminApi:
tx['destination_token_symbol'] = destination_token.symbol()
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
tx['network_status'] = 'Not submitted'
# TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
tx['network_status'] = 'Not in node'
r = None
try:
c.w3.eth.getTransaction(tx_hash)
tx['network_status'] = 'Mempool'
except web3.exceptions.TransactionNotFound:
pass
o = transaction(tx_hash)
r = self.rpc.do(o)
if r != None:
tx['network_status'] = 'Mempool'
except Exception as e:
logg.warning('(too permissive exception handler, please fix!) {}'.format(e))
try:
r = c.w3.eth.getTransactionReceipt(tx_hash)
if r.status == 1:
tx['network_status'] = 'Confirmed'
else:
tx['network_status'] = 'Reverted'
tx['network_block_number'] = r.blockNumber
tx['network_tx_index'] = r.transactionIndex
if tx['block_number'] == None:
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
except web3.exceptions.TransactionNotFound:
pass
if r != None:
try:
o = receipt(tx_hash)
r = self.rpc.do(o)
logg.debug('h {} o {}'.format(tx_hash, o))
if int(strip_0x(r['status'])) == 1:
tx['network_status'] = 'Confirmed'
else:
tx['network_status'] = 'Reverted'
tx['network_block_number'] = r.blockNumber
tx['network_tx_index'] = r.transactionIndex
if tx['block_number'] == None:
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
except Exception as e:
logg.warning('too permissive exception handler, please fix!')
pass
tx['sender_gas_balance'] = c.w3.eth.getBalance(tx['sender'])
tx['recipient_gas_balance'] = c.w3.eth.getBalance(tx['recipient'])
o = balance(tx['sender'])
r = self.rpc.do(o)
tx['sender_gas_balance'] = r
tx_unpacked = unpack_signed_raw_tx(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
o = balance(tx['recipient'])
r = self.rpc.do(o)
tx['recipient_gas_balance'] = r
tx_unpacked = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec)
tx['gas_price'] = tx_unpacked['gasPrice']
tx['gas_limit'] = tx_unpacked['gas']
tx['data'] = tx_unpacked['data']
s = celery.signature(
'cic_eth.queue.tx.get_state_log',
'cic_eth.queue.state.get_state_log',
[
chain_spec.asdict(),
tx_hash,
],
queue=self.queue,
@@ -479,4 +549,9 @@ class AdminApi:
for p in problems:
sys.stderr.write('!!!{}\n'.format(p))
return tx
if renderer == None:
return tx
r = renderer(tx)
w.write(r + '\n')
return None

View File

@@ -8,12 +8,10 @@ import logging
# external imports
import celery
#from cic_registry.chain import ChainSpec
from cic_registry import CICRegistry
from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
# local imports
from cic_eth.eth.factory import TxFactory
from cic_eth.db.enum import LockEnum
app = celery.current_app
@@ -39,7 +37,7 @@ class Api:
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.info('api using queue {}'.format(self.queue))
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
@@ -64,6 +62,18 @@ class Api:
)
def default_token(self):
s_token = celery.signature(
'cic_eth.admin.token.default_token',
[],
queue=self.queue,
)
if self.callback_param != None:
s_token.link(self.callback_success)
return s_token.apply_async()
def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
@@ -82,23 +92,26 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_str,
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
self.chain_str,
],
@@ -111,7 +124,7 @@ class Api:
target_return,
minimum_return,
to_address,
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -143,25 +156,28 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_str,
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -172,7 +188,7 @@ class Api:
target_return,
minimum_return,
from_address,
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -206,33 +222,34 @@ class Api:
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
self.chain_str,
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
from_address,
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
'cic_eth.eth.erc20.transfer',
[
from_address,
to_address,
value,
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -264,18 +281,18 @@ class Api:
logg.warning('balance pointlessly called with no callback url')
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
[token_symbol],
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_balance = celery.signature(
'cic_eth.eth.token.balance',
'cic_eth.eth.erc20.balance',
[
address,
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -291,7 +308,7 @@ class Api:
'cic_eth.queue.balance.balance_incoming',
[
address,
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -299,7 +316,7 @@ class Api:
'cic_eth.queue.balance.balance_outgoing',
[
address,
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -307,16 +324,22 @@ class Api:
s_balance_incoming.link(s_balance_outgoing)
last_in_chain = s_balance_outgoing
one = celery.chain(s_tokens, s_balance)
two = celery.chain(s_tokens, s_balance_incoming)
three = celery.chain(s_tokens, s_balance_outgoing)
one = celery.chain(s_tokens, s_balance)
two = celery.chain(s_tokens, s_balance_incoming)
three = celery.chain(s_tokens, s_balance_outgoing)
t = None
if self.callback_param != None:
s_result.link(self.callback_success).on_error(self.callback_error)
t = celery.chord([one, two, three])(s_result)
t = None
if self.callback_param != None:
s_result.link(self.callback_success).on_error(self.callback_error)
t = celery.chord([one, two, three])(s_result)
else:
t = celery.chord([one, two, three])(s_result)
else:
t = celery.chord([one, two, three])(s_result)
# TODO: Chord is inefficient with only one chain, but assemble_balances must be able to handle different structures in order to avoid chord
one = celery.chain(s_tokens, s_balance)
if self.callback_param != None:
s_result.link(self.callback_success).on_error(self.callback_error)
t = celery.chord([one])(s_result)
return t
@@ -335,37 +358,40 @@ class Api:
'cic_eth.admin.ctrl.check_lock',
[
password,
self.chain_str,
self.chain_spec.asdict(),
LockEnum.CREATE,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_account = celery.signature(
'cic_eth.eth.account.create',
[
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_nonce.link(s_account)
s_check.link(s_nonce)
s_check.link(s_account)
if self.callback_param != None:
s_account.link(self.callback_success)
if register:
s_register = celery.signature(
'cic_eth.eth.account.register',
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_str,
self.chain_spec.asdict(),
'ACCOUNT_REGISTRY_WRITER',
],
queue=self.queue,
)
s_account.link(s_register)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_nonce.link(s_register)
s_account.link(s_nonce)
t = s_check.apply_async(queue=self.queue)
return t
@@ -383,20 +409,23 @@ class Api:
'cic_eth.admin.ctrl.check_lock',
[
address,
self.chain_str,
self.chain_spec.asdict(),
LockEnum.QUEUE,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
'GAS_GIFTER',
],
queue=self.queue,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
'cic_eth.eth.gas.refill_gas',
[
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -429,8 +458,9 @@ class Api:
"""
offset = 0
s_local = celery.signature(
'cic_eth.queue.tx.get_account_tx',
'cic_eth.queue.query.get_account_tx',
[
self.chain_spec.asdict(),
address,
],
queue=self.queue,
@@ -439,7 +469,7 @@ class Api:
s_brief = celery.signature(
'cic_eth.ext.tx.tx_collate',
[
self.chain_str,
self.chain_spec.asdict(),
offset,
limit
],
@@ -465,7 +495,7 @@ class Api:
'cic_eth.ext.tx.list_tx_by_bloom',
[
address,
self.chain_str,
self.chain_spec.asdict(),
],
queue=self.queue,
)

View File

View File

@@ -0,0 +1,8 @@
from cic_eth.db.models.base import SessionBase
def health(*args, **kwargs):
session = SessionBase.create_session()
session.execute('SELECT count(*) from alembic_version')
session.close()
return True

View File

@@ -0,0 +1,48 @@
# standard imports
import logging
# external imports
from chainlib.connection import RPCConnection
from chainlib.chain import ChainSpec
from chainlib.eth.gas import balance
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
from cic_eth.db.enum import LockEnum
from cic_eth.error import LockedError
from cic_eth.admin.ctrl import check_lock
logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
session = SessionBase.create_session()
config = kwargs['config']
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec))
try:
check_lock(None, None, LockEnum.INIT)
except LockedError:
logg.warning('INIT lock is set, skipping GAS GIFTER balance check.')
return True
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
rpc = RPCConnection.connect(chain_spec, 'default')
o = balance(gas_provider)
r = rpc.do(o)
try:
r = int(r, 16)
except TypeError:
r = int(r)
gas_min = int(config.get('ETH_GAS_GIFTER_MINIMUM_BALANCE'))
if r < gas_min:
logg.error('EEK! gas gifter has balance {}, below minimum {}'.format(r, gas_min))
return False
return True

View File

@@ -0,0 +1,37 @@
# standard imports
import time
import logging
from urllib.error import URLError
# external imports
from chainlib.connection import RPCConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.sign import sign_message
from chainlib.error import JSONRPCException
logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
blocked = True
max_attempts = 5
conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer')
for i in range(max_attempts):
idx = i + 1
logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts))
try:
conn.do(sign_message(ZERO_ADDRESS, '0x2a'))
except FileNotFoundError:
pass
except ConnectionError:
pass
except URLError:
pass
except JSONRPCException:
logg.debug('signer connection succeeded')
return True
if idx < max_attempts:
time.sleep(0.5)
return False

View File

@@ -11,10 +11,6 @@ logg = logging.getLogger()
# an Engine, which the Session will use for connection
# resources
# TODO: Remove the package exports, all models should be imported using full path
from .models.otx import Otx
from .models.convert import TxConvertTransfer
def dsn_from_config(config):
"""Generate a dsn string from the provided config dict.

View File

@@ -74,10 +74,11 @@ class LockEnum(enum.IntEnum):
QUEUE: Disable queueing new or modified transactions
"""
STICKY=1
CREATE=2
SEND=4
QUEUE=8
QUERY=16
INIT=2
CREATE=4
SEND=8
QUEUE=16
QUERY=32
ALL=int(0xfffffffffffffffe)

View File

@@ -0,0 +1,29 @@
"""Add chainqueue
Revision ID: 0ec0d6d1e785
Revises:
Create Date: 2021-04-02 18:30:55.398388
"""
from alembic import op
import sqlalchemy as sa
from chainqueue.db.migrations.sqlalchemy import (
chainqueue_upgrade,
chainqueue_downgrade,
)
# revision identifiers, used by Alembic.
revision = '0ec0d6d1e785'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
chainqueue_upgrade(0, 0, 1)
def downgrade():
chainqueue_downgrade(0, 0, 1)

View File

@@ -0,0 +1,29 @@
"""Roles
Revision ID: 1f1b3b641d08
Revises: 9c420530eeb2
Create Date: 2021-04-02 18:40:27.787631
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1f1b3b641d08'
down_revision = '9c420530eeb2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
def downgrade():
op.drop_table('account_role')

View File

@@ -1,35 +0,0 @@
"""Add new syncer table
Revision ID: 2a07b543335e
Revises: a2e2aab8f331
Create Date: 2020-12-27 09:35:44.017981
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a07b543335e'
down_revision = 'a2e2aab8f331'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'blockchain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
def downgrade():
op.drop_table('blockchain_sync')

View File

@@ -1,30 +0,0 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -1,29 +0,0 @@
"""Add nonce index
Revision ID: 49b348246d70
Revises: 52c7c59cd0b1
Create Date: 2020-12-19 09:45:36.186446
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49b348246d70'
down_revision = '52c7c59cd0b1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('nonce')

View File

@@ -1,31 +0,0 @@
"""Add account roles
Revision ID: 52c7c59cd0b1
Revises: 9c4bd7491015
Create Date: 2020-12-19 07:21:38.249237
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '52c7c59cd0b1'
down_revision = '9c4bd7491015'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
pass
def downgrade():
op.drop_table('account_role')
pass

View File

@@ -1,8 +1,8 @@
"""debug output
"""DEbug
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
Revision ID: 5ca4b77ce205
Revises: 75d4767b3031
Create Date: 2021-04-02 18:42:12.257244
"""
from alembic import op
@@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
revision = '5ca4b77ce205'
down_revision = '75d4767b3031'
branch_labels = None
depends_on = None
@@ -24,9 +24,7 @@ def upgrade():
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -1,30 +0,0 @@
"""Add otx state log
Revision ID: 6ac7a1dadc46
Revises: 89e1e9baa53c
Create Date: 2021-01-30 13:59:49.022373
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ac7a1dadc46'
down_revision = '89e1e9baa53c'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_state_log',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
sa.Column('status', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('otx_state_log')

View File

@@ -1,31 +0,0 @@
"""Add attempts and version log for otx
Revision ID: 71708e943dbd
Revises: 7e8d7626e38f
Create Date: 2020-09-26 14:41:19.298651
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '71708e943dbd'
down_revision = '7e8d7626e38f'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_attempts',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('otx_attempts')
pass

View File

@@ -1,17 +1,20 @@
"""Add account lock
"""Lock
Revision ID: 89e1e9baa53c
Revises: 2a07b543335e
Create Date: 2021-01-27 19:57:36.793882
Revision ID: 75d4767b3031
Revises: 1f1b3b641d08
Create Date: 2021-04-02 18:41:20.864265
"""
import datetime
from alembic import op
import sqlalchemy as sa
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth.db.enum import LockEnum
# revision identifiers, used by Alembic.
revision = '89e1e9baa53c'
down_revision = '2a07b543335e'
revision = '75d4767b3031'
down_revision = '1f1b3b641d08'
branch_labels = None
depends_on = None
@@ -23,10 +26,12 @@ def upgrade():
sa.Column("address", sa.String(42), nullable=True),
sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False),
sa.Column("otx_id", sa.Integer, nullable=True),
sa.Column("date_created", sa.DateTime, nullable=False, default=datetime.datetime.utcnow),
sa.Column("otx_id", sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
def downgrade():
op.drop_index('idx_chain_address')

View File

@@ -1,31 +0,0 @@
"""add blocknumber pointer
Revision ID: 7cb65b893934
Revises: 8593fa1ca0f4
Create Date: 2020-09-24 19:29:13.543648
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7cb65b893934'
down_revision = '8593fa1ca0f4'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass
def downgrade():
op.drop_table('watcher_state')
pass

View File

@@ -1,45 +0,0 @@
"""Add block sync
Revision ID: 7e8d7626e38f
Revises: cd2052be6db2
Create Date: 2020-09-26 11:12:27.818524
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7e8d7626e38f'
down_revision = 'cd2052be6db2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'block_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False, unique=True),
sa.Column('block_height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('block_height_session', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_session', sa.Integer, nullable=False, default=0),
sa.Column('block_height_head', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_head', sa.Integer, nullable=False, default=0),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.drop_table('watcher_state')
pass
def downgrade():
op.drop_table('block_sync')
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass

View File

@@ -1,35 +0,0 @@
"""Add transaction queue
Revision ID: 8593fa1ca0f4
Revises:
Create Date: 2020-09-22 21:56:42.117047
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '8593fa1ca0f4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('signed_tx', sa.Text, nullable=False),
sa.Column('status', sa.Integer, nullable=False, default=-9),
sa.Column('block', sa.Integer),
)
op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True)
def downgrade():
op.drop_index('idx_otx_tx')
op.drop_table('otx')

View File

@@ -0,0 +1,39 @@
"""Nonce
Revision ID: 9c420530eeb2
Revises: b125cbf81e32
Create Date: 2021-04-02 18:38:56.459334
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c420530eeb2'
down_revision = 'b125cbf81e32'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')
op.drop_table('nonce')

View File

@@ -1,26 +0,0 @@
"""Rename block sync table
Revision ID: 9c4bd7491015
Revises: 9daa16518a91
Create Date: 2020-10-15 23:45:56.306898
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c4bd7491015'
down_revision = '9daa16518a91'
branch_labels = None
depends_on = None
def upgrade():
op.rename_table('block_sync', 'otx_sync')
pass
def downgrade():
op.rename_table('otx_sync', 'block_sync')
pass

View File

@@ -1,30 +0,0 @@
"""add tx sync state
Revision ID: 9daa16518a91
Revises: e3b5330ee71c
Create Date: 2020-10-10 14:43:18.699276
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9daa16518a91'
down_revision = 'e3b5330ee71c'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx_sync',
# sa.Column('tx', sa.String(66), nullable=False),
# )
# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')")
pass
def downgrade():
# op.drop_table('tx_sync')
pass

View File

@@ -1,34 +0,0 @@
"""Add date accessed to txcache
Revision ID: a2e2aab8f331
Revises: 49b348246d70
Create Date: 2020-12-24 18:58:06.137812
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a2e2aab8f331'
down_revision = '49b348246d70'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'tx_cache',
sa.Column(
'date_checked',
sa.DateTime,
nullable=False
)
)
pass
def downgrade():
# drop does not work withs qlite
#op.drop_column('tx_cache', 'date_checked')
pass

View File

@@ -1,8 +1,8 @@
"""convert tx index
"""Convert
Revision ID: cd2052be6db2
Revises: 7cb65b893934
Create Date: 2020-09-24 21:20:51.580500
Revision ID: aee12aeb47ec
Revises: 5ca4b77ce205
Create Date: 2021-04-02 18:42:45.233356
"""
from alembic import op
@@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'cd2052be6db2'
down_revision = '7cb65b893934'
revision = 'aee12aeb47ec'
down_revision = '5ca4b77ce205'
branch_labels = None
depends_on = None
@@ -20,10 +20,8 @@ def upgrade():
op.create_table(
'tx_convert_transfer',
sa.Column('id', sa.Integer, primary_key=True),
#sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('transfer_tx_hash', sa.String(66), unique=True),
# sa.Column('holder_address', sa.String(42), nullable=False),
sa.Column('recipient_address', sa.String(42), nullable=False),
)
op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address'])

View File

@@ -1,12 +1,13 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
Revision ID: b125cbf81e32
Revises: 0ec0d6d1e785
Create Date: 2021-04-02 18:36:44.459603
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
@@ -14,15 +15,15 @@ from chainsyncer.db.migrations.sqlalchemy import (
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
revision = 'b125cbf81e32'
down_revision = '0ec0d6d1e785'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -1,31 +0,0 @@
"""Add tx tracker record
Revision ID: df19f4e69676
Revises: 71708e943dbd
Create Date: 2020-10-09 23:31:44.563498
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'df19f4e69676'
down_revision = '71708e943dbd'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx',
# sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('date_added', sa.DateTime, nullable=False),
# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True),
# sa.Column('success', sa.Boolean(), nullable=False),
# )
pass
def downgrade():
# op.drop_table('tx')
pass

View File

@@ -1,38 +0,0 @@
"""Add cached values for tx
Revision ID: e3b5330ee71c
Revises: df19f4e69676
Create Date: 2020-10-10 00:17:07.094893
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e3b5330ee71c'
down_revision = 'df19f4e69676'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
sa.Column('source_token_address', sa.String(42), nullable=False),
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)
def downgrade():
op.drop_table('tx_cache')
pass

View File

@@ -1,32 +0,0 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -1,85 +0,0 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = .
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
#sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-eth
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -1,77 +0,0 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -1,24 +0,0 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@@ -1,35 +0,0 @@
"""Add new syncer table
Revision ID: 2a07b543335e
Revises: a2e2aab8f331
Create Date: 2020-12-27 09:35:44.017981
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a07b543335e'
down_revision = 'a2e2aab8f331'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'blockchain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
def downgrade():
op.drop_table('blockchain_sync')

View File

@@ -1,30 +0,0 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -1,29 +0,0 @@
"""Add nonce index
Revision ID: 49b348246d70
Revises: 52c7c59cd0b1
Create Date: 2020-12-19 09:45:36.186446
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49b348246d70'
down_revision = '52c7c59cd0b1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('nonce')

View File

@@ -1,31 +0,0 @@
"""Add account roles
Revision ID: 52c7c59cd0b1
Revises: 9c4bd7491015
Create Date: 2020-12-19 07:21:38.249237
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '52c7c59cd0b1'
down_revision = '9c4bd7491015'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
pass
def downgrade():
op.drop_table('account_role')
pass

View File

@@ -1,30 +0,0 @@
"""Add otx state log
Revision ID: 6ac7a1dadc46
Revises: 89e1e9baa53c
Create Date: 2021-01-30 13:59:49.022373
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ac7a1dadc46'
down_revision = '89e1e9baa53c'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_state_log',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
sa.Column('status', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('otx_state_log')

View File

@@ -1,31 +0,0 @@
"""Add attempts and version log for otx
Revision ID: 71708e943dbd
Revises: 7e8d7626e38f
Create Date: 2020-09-26 14:41:19.298651
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '71708e943dbd'
down_revision = '7e8d7626e38f'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_attempts',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('otx_attempts')
pass

View File

@@ -1,31 +0,0 @@
"""add blocknumber pointer
Revision ID: 7cb65b893934
Revises: 8593fa1ca0f4
Create Date: 2020-09-24 19:29:13.543648
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7cb65b893934'
down_revision = '8593fa1ca0f4'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass
def downgrade():
op.drop_table('watcher_state')
pass

View File

@@ -1,42 +0,0 @@
"""Add block sync
Revision ID: 7e8d7626e38f
Revises: cd2052be6db2
Create Date: 2020-09-26 11:12:27.818524
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7e8d7626e38f'
down_revision = 'cd2052be6db2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'block_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False, unique=True),
sa.Column('height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('height_session', sa.Integer, nullable=False, default=0),
sa.Column('height_head', sa.Integer, nullable=False, default=0),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.drop_table('watcher_state')
pass
def downgrade():
op.drop_table('block_sync')
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass

View File

@@ -1,35 +0,0 @@
"""Add transaction queue
Revision ID: 8593fa1ca0f4
Revises:
Create Date: 2020-09-22 21:56:42.117047
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '8593fa1ca0f4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('signed_tx', sa.Text, nullable=False),
sa.Column('status', sa.Integer, nullable=False, default=-9),
sa.Column('block', sa.Integer),
)
op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True)
def downgrade():
op.drop_index('idx_otx_tx')
op.drop_table('otx')

View File

@@ -1,32 +0,0 @@
"""Add account lock
Revision ID: 89e1e9baa53c
Revises: 2a07b543335e
Create Date: 2021-01-27 19:57:36.793882
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '89e1e9baa53c'
down_revision = '2a07b543335e'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'lock',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column("address", sa.String(42), nullable=True),
sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
def downgrade():
op.drop_index('idx_chain_address')
op.drop_table('lock')

View File

@@ -1,26 +0,0 @@
"""Rename block sync table
Revision ID: 9c4bd7491015
Revises: 9daa16518a91
Create Date: 2020-10-15 23:45:56.306898
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c4bd7491015'
down_revision = '9daa16518a91'
branch_labels = None
depends_on = None
def upgrade():
op.rename_table('block_sync', 'otx_sync')
pass
def downgrade():
op.rename_table('otx_sync', 'block_sync')
pass

View File

@@ -1,30 +0,0 @@
"""add tx sync state
Revision ID: 9daa16518a91
Revises: e3b5330ee71c
Create Date: 2020-10-10 14:43:18.699276
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9daa16518a91'
down_revision = 'e3b5330ee71c'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx_sync',
# sa.Column('tx', sa.String(66), nullable=False),
# )
# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')")
pass
def downgrade():
# op.drop_table('tx_sync')
pass

View File

@@ -1,33 +0,0 @@
"""Add date accessed to txcache
Revision ID: a2e2aab8f331
Revises: 49b348246d70
Create Date: 2020-12-24 18:58:06.137812
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a2e2aab8f331'
down_revision = '49b348246d70'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'tx_cache',
sa.Column(
'date_checked',
sa.DateTime,
nullable=False
)
)
pass
def downgrade():
op.drop_column('tx_cache', 'date_checked')
pass

View File

@@ -1,34 +0,0 @@
"""convert tx index
Revision ID: cd2052be6db2
Revises: 7cb65b893934
Create Date: 2020-09-24 21:20:51.580500
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'cd2052be6db2'
down_revision = '7cb65b893934'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_convert_transfer',
sa.Column('id', sa.Integer, primary_key=True),
#sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('transfer_tx_hash', sa.String(66), unique=True),
# sa.Column('holder_address', sa.String(42), nullable=False),
sa.Column('recipient_address', sa.String(42), nullable=False),
)
op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address'])
def downgrade():
op.drop_index('idx_tx_convert_address')
op.drop_table('tx_convert_transfer')

View File

@@ -1,31 +0,0 @@
"""Add tx tracker record
Revision ID: df19f4e69676
Revises: 71708e943dbd
Create Date: 2020-10-09 23:31:44.563498
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'df19f4e69676'
down_revision = '71708e943dbd'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx',
# sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('date_added', sa.DateTime, nullable=False),
# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True),
# sa.Column('success', sa.Boolean(), nullable=False),
# )
pass
def downgrade():
# op.drop_table('tx')
pass

View File

@@ -1,37 +0,0 @@
"""Add cached values for tx
Revision ID: e3b5330ee71c
Revises: df19f4e69676
Create Date: 2020-10-10 00:17:07.094893
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e3b5330ee71c'
down_revision = 'df19f4e69676'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
sa.Column('source_token_address', sa.String(42), nullable=False),
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)
def downgrade():
op.drop_table('tx_cache')
pass

View File

@@ -10,6 +10,7 @@ from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger()
@@ -64,6 +65,7 @@ class SessionBase(Model):
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
@@ -74,17 +76,22 @@ class SessionBase(Model):
echo=debug,
)
else:
if debug:
if pool_size == 0:
logg.info('db using nullpool')
poolclass = NullPool
elif debug:
logg.info('db using assertion pool')
poolclass = AssertionPool
else:
logg.info('db using static pool')
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db not poolable')
e = create_engine(
dsn,
echo=debug,
@@ -116,6 +123,6 @@ class SessionBase(Model):
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -4,12 +4,12 @@ import logging
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey
from cic_registry import zero_address
from chainlib.eth.constant import ZERO_ADDRESS
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.otx import Otx
logg = logging.getLogger()
@@ -22,10 +22,12 @@ class Lock(SessionBase):
__tablename__ = "lock"
blockchain = Column(String)
address = Column(String, ForeignKey('tx_cache.sender'))
#address = Column(String, ForeignKey('tx_cache.sender'))
address = Column(String, ForeignKey(TxCache.sender))
flags = Column(Integer)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
otx_id = Column(Integer, ForeignKey('otx.id'))
otx_id = Column(Integer, ForeignKey(Otx.id))
#otx_id = Column(Integer)
def chain(self):
@@ -35,7 +37,7 @@ class Lock(SessionBase):
@staticmethod
def set(chain_str, flags, address=zero_address, session=None, tx_hash=None):
def set(chain_str, flags, address=ZERO_ADDRESS, session=None, tx_hash=None):
"""Sets flags associated with the given address and chain.
If a flags entry does not exist it is created.
@@ -88,7 +90,7 @@ class Lock(SessionBase):
@staticmethod
def reset(chain_str, flags, address=zero_address, session=None):
def reset(chain_str, flags, address=ZERO_ADDRESS, session=None):
"""Resets flags associated with the given address and chain.
If the resulting flags entry value is 0, the entry will be deleted.
@@ -132,7 +134,7 @@ class Lock(SessionBase):
@staticmethod
def check(chain_str, flags, address=zero_address, session=None):
def check(chain_str, flags, address=ZERO_ADDRESS, session=None):
"""Checks whether all given flags are set for given address and chain.
Does not validate the address against any other tables or components.

View File

@@ -55,6 +55,20 @@ class Nonce(SessionBase):
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
@staticmethod
def __inc(conn, address):
#conn.execute("UPDATE nonce set nonce = nonce + 1 WHERE address_hex = '{}'".format(address))
q = conn.query(Nonce)
q = q.filter(Nonce.address_hex==address)
q = q.with_for_update()
o = q.first()
nonce = o.nonce
o.nonce += 1
conn.add(o)
conn.flush()
return nonce
@staticmethod
def __init(conn, address, nonce):
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
@@ -78,7 +92,7 @@ class Nonce(SessionBase):
# TODO: Incrementing nonce MUST be done by separate tasks.
@staticmethod
def next(address, initial_if_not_exists=0):
def next(address, initial_if_not_exists=0, session=None):
"""Generate next nonce for the given address.
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
@@ -90,28 +104,31 @@ class Nonce(SessionBase):
:returns: Nonce
:rtype: number
"""
#session = SessionBase.bind_session(session)
session = SessionBase.bind_session(session)
#session.begin_nested()
conn = Nonce.engine.connect()
if Nonce.transactional:
conn.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
logg.debug('locking nonce table for address {}'.format(address))
nonce = Nonce.__get(conn, address)
#conn = Nonce.engine.connect()
#if Nonce.transactional:
# conn.execute('BEGIN')
# conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
# logg.debug('locking nonce table for address {}'.format(address))
#nonce = Nonce.__get(conn, address)
nonce = Nonce.__get(session, address)
logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None:
nonce = initial_if_not_exists
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__init(conn, address, nonce)
Nonce.__set(conn, address, nonce+1)
if Nonce.transactional:
conn.execute('COMMIT')
logg.debug('unlocking nonce table for address {}'.format(address))
conn.close()
#Nonce.__init(conn, address, nonce)
Nonce.__init(session, address, nonce)
#Nonce.__set(conn, address, nonce+1)
nonce = Nonce.__inc(session, address)
#if Nonce.transactional:
#conn.execute('COMMIT')
# logg.debug('unlocking nonce table for address {}'.format(address))
#conn.close()
#session.commit()
#SessionBase.release_session(session)
SessionBase.release_session(session)
return nonce
@@ -119,67 +136,74 @@ class NonceReservation(SessionBase):
__tablename__ = 'nonce_task_reservation'
address_hex = Column(String(42))
nonce = Column(Integer)
key = Column(String)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
@staticmethod
def peek(key, session=None):
def peek(address, key, session=None):
session = SessionBase.bind_session(session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
q = q.filter(NonceReservation.address_hex==address)
o = q.first()
nonce = None
r = None
if o != None:
nonce = o.nonce
r = (o.key, o.nonce)
session.flush()
SessionBase.release_session(session)
return nonce
return r
@staticmethod
def release(key, session=None):
def release(address, key, session=None):
session = SessionBase.bind_session(session)
nonce = NonceReservation.peek(key, session=session)
o = NonceReservation.peek(address, key, session=session)
if o == None:
SessionBase.release_session(session)
raise IntegrityError('"release" called on key {} address {} which does not exists'.format(key, address))
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
q = q.filter(NonceReservation.address_hex==address)
o = q.first()
if o == None:
raise IntegrityError('nonce for key {}'.format(nonce))
SessionBase.release_session(session)
r = (o.key, o.nonce)
session.delete(o)
session.flush()
SessionBase.release_session(session)
return nonce
return r
@staticmethod
def next(address, key, session=None):
session = SessionBase.bind_session(session)
if NonceReservation.peek(key, session) != None:
raise IntegrityError('nonce for key {}'.format(key))
o = NonceReservation.peek(address, key, session)
if o != None:
raise IntegrityError('"next" called on nonce for key {} address {} during active key {}'.format(key, address, o[0]))
nonce = Nonce.next(address)
nonce = Nonce.next(address, session=session)
o = NonceReservation()
o.nonce = nonce
o.key = key
o.address_hex = address
session.add(o)
r = (key, nonce)
SessionBase.release_session(session)
return nonce
return r

View File

@@ -1,656 +0,0 @@
# standard imports
import datetime
import logging
# external imports
from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
is_error_status,
)
from cic_eth.db.error import TxStateChangeError
#from cic_eth.eth.util import address_hex_from_signed_tx
logg = logging.getLogger()
class OtxStateLog(SessionBase):
__tablename__ = 'otx_state_log'
date = Column(DateTime, default=datetime.datetime.utcnow)
status = Column(Integer)
otx_id = Column(Integer, ForeignKey('otx.id'))
def __init__(self, otx):
self.otx_id = otx.id
self.status = otx.status
class Otx(SessionBase):
"""Outgoing transactions with local origin.
:param nonce: Transaction nonce
:type nonce: number
:param address: Ethereum address of recipient - NOT IN USE, REMOVE
:type address: str
:param tx_hash: Tranasction hash
:type tx_hash: str, 0x-hex
:param signed_tx: Signed raw transaction data
:type signed_tx: str, 0x-hex
"""
__tablename__ = 'otx'
tracing = False
"""Whether to enable queue state tracing"""
nonce = Column(Integer)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
tx_hash = Column(String(66))
signed_tx = Column(Text)
status = Column(Integer)
block = Column(Integer)
def __set_status(self, status, session):
self.status |= status
session.add(self)
session.flush()
def __reset_status(self, status, session):
status_edit = ~status & self.status
self.status &= status_edit
session.add(self)
session.flush()
def __status_already_set(self, status):
r = bool(self.status & status)
if r:
logg.warning('status bit {} already set on {}'.format(status.name, self.tx_hash))
return r
def __status_not_set(self, status):
r = not(self.status & status)
if r:
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None):
"""Set block number transaction was mined in.
Only manipulates object, does not transaction or commit to backend.
:param block: Block number
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
if self.block != None:
raise TxStateChangeError('Attempted set block {} when block was already {}'.format(block, self.block))
self.block = block
localsession.add(self)
localsession.flush()
if session==None:
localsession.commit()
localsession.close()
def waitforgas(self, session=None):
"""Marks transaction as suspended pending gas funding.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.GAS_ISSUES):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.GAS_ISSUES, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.DEFERRED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def fubar(self, session=None):
"""Marks transaction as "fubar." Any transaction marked this way is an anomaly and may be a symptom of a serious problem.
Only manipulates object, does not transaction or commit to backend.
"""
if self.__status_already_set(StatusBits.UNKNOWN_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def reject(self, session=None):
"""Marks transaction as "rejected," which means the node rejected sending the transaction to the network. The nonce has not been spent, and the transaction should be replaced.
Only manipulates object, does not transaction or commit to backend.
"""
if self.__status_already_set(StatusBits.NODE_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def override(self, manual=False, session=None):
"""Marks transaction as manually overridden.
Only manipulates object, does not transaction or commit to backend.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.OBSOLETE, session)
#if manual:
# self.__set_status(StatusBits.MANUAL, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.IN_NETWORK, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def manual(self, session=None):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.MANUAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def retry(self, session=None):
"""Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0:
raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def readysend(self, session=None):
"""Marks transaction as ready for initial send attempt.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sent(self, session=None):
"""Marks transaction as having been sent to network.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.IN_NETWORK):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.IN_NETWORK, session)
self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sendfail(self, session=None):
"""Marks that an attempt to send the transaction to the network has failed.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NODE_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def dequeue(self, session=None):
"""Marks that a process to execute send attempt is underway
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_not_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__reset_status(StatusBits.QUEUED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None):
"""Marks that transaction was mined but code execution did not succeed.
Only manipulates object, does not transaction or commit to backend.
:param block: Block number transaction was mined in.
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NETWORK_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if block != None:
self.block = block
self.__set_status(StatusBits.NETWORK_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def cancel(self, confirmed=False, session=None):
"""Marks that the transaction has been succeeded by a new transaction with same nonce.
If set to confirmed, the previous state must be OBSOLETED, and will transition to CANCELLED - a finalized state. Otherwise, the state must follow a non-finalized state, and will be set to OBSOLETED.
Only manipulates object, does not transaction or commit to backend.
:param confirmed: Whether transition is to a final state.
:type confirmed: bool
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed:
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusEnum.CANCELLED, session)
else:
self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def success(self, block, session=None):
"""Marks that transaction was successfully mined.
Only manipulates object, does not transaction or commit to backend.
:param block: Block number transaction was mined in.
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status)))
if block != None:
self.block = block
self.__set_status(StatusEnum.SUCCESS, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
@staticmethod
def get(status=0, limit=4096, status_exact=True, session=None):
"""Returns outgoing transaction lists by status.
Status may either be matched exactly, or be an upper bound of the integer value of the status enum.
:param status: Status value to use in query
:type status: cic_eth.db.enum.StatusEnum
:param limit: Max results to return
:type limit: number
:param status_exact: Whether or not to perform exact status match
:type bool:
:returns: List of transaction hashes
:rtype: tuple, where first element is transaction hash
"""
e = None
session = SessionBase.bind_session(session)
if status_exact:
e = session.query(Otx.tx_hash).filter(Otx.status==status).order_by(Otx.date_created.asc()).limit(limit).all()
else:
e = session.query(Otx.tx_hash).filter(Otx.status<=status).order_by(Otx.date_created.asc()).limit(limit).all()
SessionBase.release_session(session)
return e
@staticmethod
def load(tx_hash, session=None):
"""Retrieves the outgoing transaction record by transaction hash.
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
"""
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
SessionBase.release_session(session)
return q.first()
@staticmethod
def account(account_address):
"""Retrieves all transaction hashes for which the given Ethereum address is sender or recipient.
:param account_address: Ethereum address to use in query.
:type account_address: str, 0x-hex
:returns: Outgoing transactions
:rtype: tuple, where first element is transaction hash
"""
session = Otx.create_session()
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(or_(TxCache.sender==account_address, TxCache.recipient==account_address))
txs = q.all()
session.close()
return list(txs)
def __state_log(self, session):
l = OtxStateLog(self)
session.add(l)
@staticmethod
def add(nonce, address, tx_hash, signed_tx, session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
otx = Otx(nonce, address, tx_hash, signed_tx)
localsession.add(otx)
localsession.flush()
if otx.tracing:
otx.__state_log(session=localsession)
localsession.flush()
if session==None:
localsession.commit()
localsession.close()
return None
return otx
def __init__(self, nonce, address, tx_hash, signed_tx):
self.nonce = nonce
self.tx_hash = tx_hash
self.signed_tx = signed_tx
self.status = StatusEnum.PENDING
signed_tx_bytes = bytes.fromhex(signed_tx[2:])
# sender_address = address_hex_from_signed_tx(signed_tx_bytes)
# logg.debug('decoded tx {}'.format(sender_address))
# TODO: Most of the methods on this object are obsolete, but it contains a static function for retrieving "expired" outgoing transactions that should be moved to Otx instead.
class OtxSync(SessionBase):
"""Obsolete
"""
__tablename__ = 'otx_sync'
blockchain = Column(String)
block_height_backlog = Column(Integer)
tx_height_backlog = Column(Integer)
block_height_session = Column(Integer)
tx_height_session = Column(Integer)
block_height_head = Column(Integer)
tx_height_head = Column(Integer)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
date_updated = Column(DateTime)
def backlog(self, block_height=None, tx_height=None):
#session = OtxSync.create_session()
if block_height != None:
if tx_height == None:
raise ValueError('tx height missing')
self.block_height_backlog = block_height
self.tx_height_backlog = tx_height
#session.add(self)
self.date_updated = datetime.datetime.utcnow()
#session.commit()
block_height = self.block_height_backlog
tx_height = self.tx_height_backlog
#session.close()
return (block_height, tx_height)
def session(self, block_height=None, tx_height=None):
#session = OtxSync.create_session()
if block_height != None:
if tx_height == None:
raise ValueError('tx height missing')
self.block_height_session = block_height
self.tx_height_session = tx_height
#session.add(self)
self.date_updated = datetime.datetime.utcnow()
#session.commit()
block_height = self.block_height_session
tx_height = self.tx_height_session
#session.close()
return (block_height, tx_height)
def head(self, block_height=None, tx_height=None):
#session = OtxSync.create_session()
if block_height != None:
if tx_height == None:
raise ValueError('tx height missing')
self.block_height_head = block_height
self.tx_height_head = tx_height
#session.add(self)
self.date_updated = datetime.datetime.utcnow()
#session.commit()
block_height = self.block_height_head
tx_height = self.tx_height_head
#session.close()
return (block_height, tx_height)
@hybrid_property
def synced(self):
#return self.block_height_session == self.block_height_backlog and self.tx_height_session == self.block_height_backlog
return self.block_height_session == self.block_height_backlog and self.tx_height_session == self.tx_height_backlog
@staticmethod
def load(blockchain_string, session):
q = session.query(OtxSync)
q = q.filter(OtxSync.blockchain==blockchain_string)
return q.first()
@staticmethod
def latest(nonce):
session = SessionBase.create_session()
otx = session.query(Otx).filter(Otx.nonce==nonce).order_by(Otx.created.desc()).first()
session.close()
return otx
@staticmethod
def get_expired(datetime_threshold):
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.date_created<datetime_threshold)
q = q.filter(Otx.status==StatusEnum.SENT)
q = q.order_by(Otx.date_created.desc())
q = q.group_by(Otx.nonce)
q = q.group_by(Otx.id)
otxs = q.all()
session.close()
return otxs
def chain(self):
return self.blockchain
def __init__(self, blockchain):
self.blockchain = blockchain
self.block_height_head = 0
self.tx_height_head = 0
self.block_height_session = 0
self.tx_height_session = 0
self.block_height_backlog = 0
self.tx_height_backlog = 0

View File

@@ -1,9 +1,9 @@
# standard imports
import logging
# third-party imports
# external imports
from sqlalchemy import Column, String, Text
from cic_registry import zero_address
from chainlib.eth.constant import ZERO_ADDRESS
# local imports
from .base import SessionBase
@@ -42,7 +42,7 @@ class AccountRole(SessionBase):
role = AccountRole.__get_role(tag, session)
r = zero_address
r = ZERO_ADDRESS
if role != None:
r = role.address_hex
@@ -133,4 +133,4 @@ class AccountRole(SessionBase):
def __init__(self, tag):
self.tag = tag
self.address_hex = zero_address
self.address_hex = ZERO_ADDRESS

View File

@@ -1,151 +0,0 @@
# standard imports
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, Enum, ForeignKey, Boolean, NUMERIC
from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property
#from sqlalchemy.orm import relationship, backref
#from sqlalchemy.ext.declarative import declarative_base
# local imports
from .base import SessionBase
from .otx import Otx
from cic_eth.db.util import num_serialize
from cic_eth.error import NotLocalTxError
from cic_eth.db.error import TxStateChangeError
class TxCache(SessionBase):
"""Metadata expansions for outgoing transactions.
These records are not essential for handling of outgoing transaction queues. It is implemented to reduce the amount of computation spent of parsing and analysing raw signed transaction data.
Instantiation of the object will fail if an outgoing transaction record with the same transaction hash does not exist.
Typically three types of transactions are recorded:
- Token transfers; where source and destination token values and addresses are identical, sender and recipient differ.
- Token conversions; source and destination token values and addresses differ, sender and recipient are identical.
- Any other transaction; source and destination token addresses are zero-address.
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param sender: Ethereum address of transaction sender
:type sender: str, 0x-hex
:param recipient: Ethereum address of transaction beneficiary (e.g. token transfer recipient)
:type recipient: str, 0x-hex
:param source_token_address: Contract address of token that sender spent from
:type source_token_address: str, 0x-hex
:param destination_token_address: Contract address of token that recipient will receive balance of
:type destination_token_address: str, 0x-hex
:param from_value: Amount of source tokens spent
:type from_value: number
:param to_value: Amount of destination tokens received
:type to_value: number
:param block_number: Block height the transaction was mined at, or None if not yet mined
:type block_number: number or None
:param tx_number: Block transaction height the transaction was mined at, or None if not yet mined
:type tx_number: number or None
:raises FileNotFoundError: Outgoing transaction for given transaction hash does not exist
"""
__tablename__ = 'tx_cache'
otx_id = Column(Integer, ForeignKey('otx.id'))
source_token_address = Column(String(42))
destination_token_address = Column(String(42))
sender = Column(String(42))
recipient = Column(String(42))
from_value = Column(NUMERIC())
to_value = Column(NUMERIC())
block_number = Column(Integer())
tx_index = Column(Integer())
date_created = Column(DateTime, default=datetime.datetime.utcnow)
date_updated = Column(DateTime, default=datetime.datetime.utcnow)
date_checked = Column(DateTime, default=datetime.datetime.utcnow)
def check(self):
"""Update the "checked" timestamp to current time.
Only manipulates object, does not transaction or commit to backend.
"""
self.date_checked = datetime.datetime.now()
@staticmethod
def clone(
tx_hash_original,
tx_hash_new,
session=None,
):
"""Copy tx cache data and associate it with a new transaction.
:param tx_hash_original: tx cache data to copy
:type tx_hash_original: str, 0x-hex
:param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex
"""
session = SessionBase.bind_session(session)
q = session.query(TxCache)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_original)
txc = q.first()
if txc == None:
SessionBase.release_session(session)
raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None:
SessionBase.release_session(session)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
session.flush()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_new)
otx = q.first()
if otx == None:
SessionBase.release_session(session)
raise NotLocalTxError('new {}'.format(tx_hash_new))
txc_new = TxCache(
otx.tx_hash,
txc.sender,
txc.recipient,
txc.source_token_address,
txc.destination_token_address,
int(txc.from_value),
int(txc.to_value),
session=session,
)
session.add(txc_new)
session.commit()
SessionBase.release_session(session)
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
if tx == None:
SessionBase.release_session(session)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id
self.sender = sender
self.recipient = recipient
self.source_token_address = source_token_address
self.destination_token_address = destination_token_address
self.from_value = from_value
self.to_value = to_value
self.block_number = block_number
self.tx_index = tx_index
# not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created
self.date_checked = self.date_created
SessionBase.release_session(session)

View File

@@ -4,12 +4,6 @@ class TokenCountError(Exception):
pass
class NotLocalTxError(Exception):
"""Exception raised when trying to access a tx not originated from a local task
"""
pass
class PermanentTxError(Exception):
"""Exception raised when encountering a permanent error when sending a tx.
@@ -66,3 +60,17 @@ class LockedError(Exception):
"""
pass
class SignerError(Exception):
"""Exception raised when signer is unavailable or generates an error
"""
pass
class EthError(Exception):
"""Exception raised when unspecified error from evm node is encountered
"""
pass

View File

@@ -1,16 +0,0 @@
"""Ethereum batch functions and utilities
.. moduleauthor:: Louis Holbrook <dev@holbrook.no>
"""
# standard imports
import os
# local imports
from .rpc import RpcClient
registry_extra_identifiers = {
'Faucet': '0x{:0<64s}'.format(b'Faucet'.hex()),
'TransferApproval': '0x{:0<64s}'.format(b'TransferApproval'.hex()),
}

View File

@@ -1,146 +1,60 @@
# standard imports
import logging
# third-party imports
import web3
# external imports
import celery
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from erc20_single_shot_faucet import Faucet
from cic_registry import zero_address
from hexathon import strip_0x
from erc20_single_shot_faucet import SingleShotFaucet as Faucet
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import (
strip_0x,
)
from chainlib.connection import RPCConnection
from chainlib.eth.sign import (
new_account,
sign_message,
)
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
TxFormat,
unpack,
)
from chainlib.chain import ChainSpec
from eth_accounts_index import AccountRegistry
from sarafu_faucet import MinterFaucet as Faucet
from chainqueue.db.models.tx import TxCache
# local import
from cic_eth.eth import RpcClient
from cic_eth.eth import registry_extra_identifiers
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.factory import TxFactory
from cic_eth_registry import CICRegistry
from cic_eth.eth.gas import (
create_check_gas_task,
)
#from cic_eth.eth.factory import TxFactory
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.error import RoleMissingError
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.error import (
RoleMissingError,
SignerError,
)
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndSignerTask,
BaseTask,
)
from cic_eth.eth.nonce import (
CustodialTaskNonceOracle,
)
from cic_eth.queue.tx import (
register_tx,
)
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
celery_app = celery.current_app
class AccountTxFactory(TxFactory):
"""Factory for creating account index contract transactions
"""
def add(
self,
address,
chain_spec,
uuid,
session=None,
):
"""Register an Ethereum account address with the on-chain account registry
:param address: Ethereum account address to add
:type address: str, 0x-hex
:param chain_spec: Chain to build transaction for
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "AccountRegistry.add" transaction in standard Ethereum format
:rtype: dict
"""
c = CICRegistry.get_contract(chain_spec, 'AccountRegistry')
f = c.function('add')
tx_add_buildable = f(
address,
)
gas = c.gas('add')
tx_add = tx_add_buildable.buildTransaction({
'from': self.address,
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
def gift(
self,
address,
chain_spec,
uuid,
session=None,
):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
:param address: Ethereum account address to gift to
:type address: str, 0x-hex
:param chain_spec: Chain to build transaction for
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "Faucet.giveTo" transaction in standard Ethereum format
:rtype: dict
"""
c = CICRegistry.get_contract(chain_spec, 'Faucet')
f = c.function('giveTo')
tx_add_buildable = f(address)
gas = c.gas('add')
tx_add = tx_add_buildable.buildTransaction({
'from': self.address,
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
def unpack_register(data):
"""Verifies that a transaction is an "AccountRegister.add" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
if f != '0a3b0a4f':
raise ValueError('Invalid account index register data ({})'.format(f))
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
def unpack_gift(data):
"""Verifies that a transaction is a "Faucet.giveTo" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
if f != '63e4bff4':
raise ValueError('Invalid gift data ({})'.format(f))
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
# TODO: Separate out nonce initialization task
@celery_app.task(base=CriticalSQLAlchemyTask)
def create(password, chain_str):
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def create(self, password, chain_spec_dict):
"""Creates and stores a new ethereum account in the keystore.
The password is passed on to the wallet backend, no encryption is performed in the task worker.
@@ -152,21 +66,27 @@ def create(password, chain_str):
:returns: Ethereum address of newly created account
:rtype: str, 0x-hex
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
a = c.w3.eth.personal.new_account(password)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
a = None
conn = RPCConnection.connect(chain_spec, 'signer')
o = new_account()
a = conn.do(o)
conn.disconnect()
if a == None:
raise SignerError('create account')
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
session = SessionBase.create_session()
session = self.create_session()
Nonce.init(a, session=session)
session.commit()
session.close()
return a
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask)
def register(self, account_address, chain_str, writer_address=None):
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyAndSignerTask)
def register(self, account_address, chain_spec_dict, writer_address=None):
"""Creates a transaction to add the given address to the accounts index.
:param account_address: Ethereum address to add
@@ -179,35 +99,52 @@ def register(self, account_address, chain_str, writer_address=None):
:returns: The account_address input param
:rtype: str, 0x-hex
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
session = self.create_session()
if writer_address == None:
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session)
writer_address = AccountRole.get_address('ACCOUNT_REGISTRY_WRITER', session=session)
if writer_address == zero_address:
if writer_address == ZERO_ADDRESS:
session.close()
raise RoleMissingError(account_address)
raise RoleMissingError('writer address for regsistering {}'.format(account_address))
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
queue = self.request.delivery_info['routing_key']
queue = self.request.delivery_info.get('routing_key')
c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c)
# Retrieve account index address
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
call_address = AccountRole.get_address('DEFAULT', session=session)
if writer_address == ZERO_ADDRESS:
session.close()
raise RoleMissingError('call address for resgistering {}'.format(account_address))
account_registry_address = registry.by_name('AccountRegistry', sender_address=call_address)
# Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas)
account_registry = AccountRegistry(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
# add transaction to queue
cache_task = 'cic_eth.eth.account.cache_account_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
session.commit()
session.close()
gas_budget = tx_add['gas'] * tx_add['gasPrice']
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
logg.debug('register user tx {} {} {}'.format(tx_hash_hex, queue, gas_budget))
rpc.disconnect()
logg.debug('register user tx {}'.format(tx_hash_hex))
s = create_check_gas_and_send_task(
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_str,
chain_spec,
writer_address,
gas_budget,
gas=gas_budget,
tx_hashes_hex=[tx_hash_hex],
queue=queue,
)
@@ -215,8 +152,8 @@ def register(self, account_address, chain_str, writer_address=None):
return account_address
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def gift(self, account_address, chain_str):
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def gift(self, account_address, chain_spec_dict):
"""Creates a transaction to invoke the faucet contract for the given address.
:param account_address: Ethereum address to give to
@@ -226,36 +163,51 @@ def gift(self, account_address, chain_str):
:returns: Raw signed transaction
:rtype: list with transaction as only element
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
logg.debug('gift account address {} to index'.format(account_address))
queue = self.request.delivery_info['routing_key']
queue = self.request.delivery_info.get('routing_key')
c = RpcClient(chain_spec, holder_address=account_address)
txf = AccountTxFactory(account_address, c)
# Retrieve account index address
session = self.create_session()
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
faucet_address = registry.by_name('Faucet', sender_address=self.call_address)
session = SessionBase.create_session()
tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
# Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(account_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, Faucet.gas)
faucet = Faucet(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = faucet.give_to(faucet_address, account_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
# add transaction to queue
cache_task = 'cic_eth.eth.account.cache_gift_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task, session=session)
session.commit()
session.close()
gas_budget = tx_add['gas'] * tx_add['gasPrice']
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
logg.debug('register user tx {} {} {}'.format(tx_hash_hex, queue, gas_budget))
rpc.disconnect()
logg.debug('gift user tx {}'.format(tx_hash_hex))
s = create_check_gas_and_send_task(
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_str,
chain_spec,
account_address,
gas_budget,
[tx_hash_hex],
queue=queue,
)
s.apply_async()
return [tx_signed_raw_hex]
@celery_app.task(bind=True)
def have(self, account, chain_str):
def have(self, account, chain_spec_dict):
"""Check whether the given account exists in keystore
:param account: Account to check
@@ -265,17 +217,38 @@ def have(self, account, chain_str):
:returns: Account, or None if not exists
:rtype: Varies
"""
c = RpcClient(account)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
o = sign_message(account, '0x2a')
try:
c.w3.eth.sign(account, text='2a')
return account
conn = RPCConnection.connect(chain_spec, 'signer')
except Exception as e:
logg.debug('cannot sign with {}: {}'.format(account, e))
return None
try:
conn.do(o)
conn.disconnect()
return account
except Exception as e:
logg.debug('cannot sign with {}: {}'.format(account, e))
conn.disconnect()
return None
@celery_app.task(bind=True)
def role(self, account, chain_str):
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def set_role(self, tag, address, chain_spec_dict):
if not to_checksum_address(address):
raise ValueError('invalid checksum address {}'.format(address))
session = SessionBase.create_session()
role = AccountRole.set(tag, address, session=session)
session.add(role)
session.commit()
session.close()
return tag
@celery_app.task(bind=True, base=BaseTask)
def role(self, address, chain_spec_dict):
"""Return account role for address
:param account: Account to check
@@ -285,14 +258,18 @@ def role(self, account, chain_str):
:returns: Account, or None if not exists
:rtype: Varies
"""
return AccountRole.role_for(account)
session = self.create_session()
role_tag = AccountRole.role_for(address, session=session)
session.close()
return role_tag
@celery_app.task()
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def cache_gift_data(
self,
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
chain_spec_dict,
):
"""Generates and commits transaction cache metadata for a Faucet.giveTo transaction
@@ -305,21 +282,20 @@ def cache_gift_data(
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_gift(tx['data'])
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = Faucet.parse_give_to_request(tx['data'])
session = SessionBase.create_session()
session = self.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
zero_address,
zero_address,
ZERO_ADDRESS,
ZERO_ADDRESS,
0,
0,
session=session,
@@ -332,11 +308,12 @@ def cache_gift_data(
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def cache_account_data(
self,
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
chain_spec_dict,
):
"""Generates and commits transaction cache metadata for an AccountsIndex.add transaction
@@ -349,21 +326,18 @@ def cache_account_data(
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_register(tx['data'])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = AccountRegistry.parse_add_request(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
zero_address,
zero_address,
ZERO_ADDRESS,
ZERO_ADDRESS,
0,
0,
session=session,

View File

@@ -209,7 +209,7 @@ def convert_with_default_reserve(self, tokens, from_address, source_amount, mini
# s_queue.apply_async()
#
# s_check_gas = celery.signature(
# 'cic_eth.eth.tx.check_gas',
# 'cic_eth.eth.gas.check_gas',
# [
# c['address'],
# [c['signed_tx']],
@@ -222,7 +222,7 @@ def convert_with_default_reserve(self, tokens, from_address, source_amount, mini
# )
#
# s_set_sent = celery.signature(
# 'cic_eth.queue.tx.set_sent_status',
# 'cic_eth.queue.state.set_sent',
# [False],
# )
# s_send.link(s_set_sent)
@@ -364,7 +364,7 @@ def otx_cache_convert(
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = unpack_convert(tx['data'])
logg.debug('tx data {}'.format(tx_data))

View File

@@ -0,0 +1,310 @@
# standard imports
import logging
# external imports
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import (
TxFormat,
unpack,
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from hexathon import strip_0x
from chainqueue.db.models.tx import TxCache
from chainqueue.error import NotLocalTxError
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.error import (
TokenCountError,
PermanentTxError,
OutOfGasError,
)
from cic_eth.queue.tx import register_tx
from cic_eth.eth.gas import (
create_check_gas_task,
MaxGasOracle,
)
from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalSQLAlchemyAndSignerTask,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(base=CriticalWeb3Task)
def balance(tokens, holder_address, chain_spec_dict):
"""Return token balances for a list of tokens for given address
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param chain_spec_dict: Chain spec string representation
:type chain_spec_dict: str
:return: List of balances
:rtype: list of int
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
caller_address = ERC20Token.caller_address
for t in tokens:
address = t['address']
token = ERC20Token(chain_spec, rpc, address)
c = ERC20(chain_spec)
o = c.balance_of(address, holder_address, sender_address=caller_address)
r = rpc.do(o)
t['balance_network'] = c.parse_balance(r)
rpc.disconnect()
return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_dict):
"""Transfer ERC20 tokens between addresses
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
logg.debug('tokens {}'.format(tokens))
if len(tokens) != 1:
raise TokenCountError
t = tokens[0]
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
rpc.disconnect()
cache_task = 'cic_eth.eth.erc20.cache_transfer_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
session.commit()
session.close()
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
logg.debug('transfer tx {} {} {}'.format(tx_hash_hex, queue, gas_budget))
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def approve(self, tokens, holder_address, spender_address, value, chain_spec_dict):
"""Approve ERC20 transfer on behalf of holder address
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
t = tokens[0]
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
rpc.disconnect()
cache_task = 'cic_eth.eth.erc20.cache_approve_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
session.commit()
session.close()
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalWeb3Task)
def resolve_tokens_by_symbol(self, token_symbols, chain_spec_dict):
"""Returns contract addresses of an array of ERC20 token symbols
:param token_symbols: Token symbols to resolve
:type token_symbols: list of str
:param chain_str: Chain spec string representation
:type chain_str: str
:return: Respective token contract addresses
:rtype: list of str, 0x-hex
"""
tokens = []
chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
session = self.create_session()
sender_address = AccountRole.get_address('DEFAULT', session)
session.close()
for token_symbol in token_symbols:
token_address = registry.by_name(token_symbol, sender_address=sender_address)
logg.debug('token {}'.format(token_address))
tokens.append({
'address': token_address,
'converters': [],
})
rpc.disconnect()
return tokens
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_transfer_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_transfer
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = ERC20.parse_transfer_request(tx['data'])
recipient_address = tx_data[0]
token_value = tx_data[1]
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_approve_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_approve
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = ERC20.parse_approve_request(tx['data'])
recipient_address = tx_data[0]
token_value = tx_data[1]
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -1,41 +0,0 @@
# standard imports
import logging
# local imports
from cic_registry import CICRegistry
from cic_eth.eth.nonce import NonceOracle
from cic_eth.eth import RpcClient
logg = logging.getLogger(__name__)
class TxFactory:
"""Base class for transaction factory classes.
:param from_address: Signer address to create transaction on behalf of
:type from_address: str, 0x-hex
:param rpc_client: RPC connection object to use to acquire account nonce if no record in nonce cache
:type rpc_client: cic_eth.eth.rpc.RpcClient
"""
gas_price = 100
"""Gas price, updated between batches"""
def __init__(self, from_address, rpc_client):
self.address = from_address
self.default_nonce = rpc_client.w3.eth.getTransactionCount(from_address, 'pending')
self.nonce_oracle = NonceOracle(from_address, self.default_nonce)
TxFactory.gas_price = rpc_client.gas_price()
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self, uuid, session=None):
"""Returns the current reserved nonce value, and increments it for next transaction.
:returns: Nonce
:rtype: number
"""
return self.nonce_oracle.next_by_task_uuid(uuid, session=session)

View File

@@ -1,74 +1,431 @@
# standard imports
import logging
# external imports
import celery
from hexathon import strip_0x
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
from chainlib.connection import RPCConnection
from chainqueue.db.enum import StatusBits
from chainlib.eth.gas import (
balance,
price,
)
from chainlib.eth.error import (
NotFoundEthException,
EthException,
)
from chainlib.eth.tx import (
TxFactory,
TxFormat,
unpack,
)
from chainlib.eth.contract import (
abi_decode_single,
ABIContractType,
)
from chainlib.eth.gas import (
Gas,
OverrideGasOracle,
)
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
from cic_eth.error import (
AlreadyFillingGasError,
OutOfGasError,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.queue.tx import (
queue_create,
register_tx,
)
from cic_eth.queue.query import get_tx
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndWeb3Task,
CriticalSQLAlchemyAndSignerTask,
CriticalWeb3AndSignerTask,
)
celery_app = celery.current_app
logg = logging.getLogger()
class GasOracle():
"""Provides gas pricing for transactions.
class MaxGasOracle:
:param w3: Web3 object
:type w3: web3.Web3
def gas(code=None):
return 8000000
def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
"""Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher.
If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value.
:param tx_signed_raws_hex: Raw signed transaction data
:type tx_signed_raws_hex: list of str, 0x-hex
:param chain_spec: Chain spec of address to add check gas for
:type chain_spec: chainlib.chain.ChainSpec
:param holder_address: Address sending the transactions
:type holder_address: str, 0x-hex
:param gas: Gas budget hint for transactions
:type gas: int
:param tx_hashes_hex: Transaction hashes
:type tx_hashes_hex: list of str, 0x-hex
:param queue: Task queue
:type queue: str
:returns: Signature of task chain
:rtype: celery.Signature
"""
__safe_threshold_amount_value = 2000000000 * 60000 * 3
__refill_amount_value = __safe_threshold_amount_value * 5
default_gas_limit = 21000
def __init__(self, w3):
self.w3 = w3
self.gas_price_current = w3.eth.gas_price()
s_check_gas = None
if tx_hashes_hex != None:
s_check_gas = celery.signature(
'cic_eth.eth.gas.check_gas',
[
tx_hashes_hex,
chain_spec.asdict(),
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
else:
s_check_gas = celery.signature(
'cic_eth.eth.gas.check_gas',
[
chain_spec.asdict(),
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
return s_check_gas
def safe_threshold_amount(self):
"""The gas balance threshold under which a new gas refill transaction should be initiated.
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_parse_tx
:returns: Gas token amount
:rtype: number
"""
g = GasOracle.__safe_threshold_amount_value
logg.warning('gas safe threshold is currently hardcoded to {}'.format(g))
return g
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
tx['value'],
tx['value'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
def refill_amount(self):
"""The amount of gas tokens to send in a gas refill transaction.
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
:returns: Gas token amount
:rtype: number
"""
g = GasOracle.__refill_amount_value
logg.warning('gas refill amount is currently hardcoded to {}'.format(g))
return g
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
def gas_provider(self):
"""Gas provider address.
If account balance is sufficient, but level of gas before spend is below "safe" threshold, gas refill is requested, and execution continues normally.
:returns: Etheerum account address
:rtype: str, 0x-hex
"""
session = SessionBase.create_session()
a = AccountRole.get_address('GAS_GIFTER', session)
:param tx_hashes: Transaction hashes due to be submitted
:type tx_hashes: list of str, 0x-hex
:param chain_spec_dict: Chain spec dict representation
:type chain_spec_dict: dict
:param txs: Signed raw transaction data, corresponding to tx_hashes
:type txs: list of str, 0x-hex
:param address: Sender address
:type address: str, 0x-hex
:param gas_required: Gas limit * gas price for transaction, (optional, if not set will be retrived from transaction data)
:type gas_required: int
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
if len(txs) == 0:
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
if address == None:
address = o['address']
#if not web3.Web3.isChecksumAddress(address):
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
conn = RPCConnection.connect(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
gas_balance = 0
try:
o = balance(address)
r = conn.do(o)
conn.disconnect()
gas_balance = abi_decode_single(ABIContractType.UINT256, r)
except EthException as e:
conn.disconnect()
raise EthError('gas_balance call for {}: {}'.format(address, e))
logg.debug('address {} has gas {} needs {}'.format(address, gas_balance, gas_required))
session = SessionBase.create_session()
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
if gas_required > gas_balance:
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
address,
chain_spec_dict,
gas_provider,
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.gas.refill_gas',
[
chain_spec_dict,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.state.set_waitforgas',
[
chain_spec_dict,
tx_hash,
],
queue=queue,
)
wait_tasks.append(s)
celery.group(wait_tasks)()
raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, gas_balance))
safe_gas = self.safe_gas_threshold_amount
if gas_balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
address,
chain_spec_dict,
gas_provider,
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.gas.refill_gas',
[
chain_spec_dict,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(gas_provider, address))
ready_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.state.set_ready',
[
chain_spec_dict,
tx_hash,
],
queue=queue,
)
ready_tasks.append(s)
celery.group(ready_tasks)()
return txs
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3AndSignerTask)
def refill_gas(self, recipient_address, chain_spec_dict):
"""Executes a native token transaction to fund the recipient's gas expenditures.
:param recipient_address: Recipient in need of gas
:type recipient_address: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:raises AlreadyFillingGasError: A gas refill transaction for this address is already executing
:returns: Transaction hash.
:rtype: str, 0x-hex
"""
# essentials
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
# Determine value of gas tokens to send
# if an uncompleted gas refill for the same recipient already exists, we still need to spend the nonce
# however, we will perform a 0-value transaction instead
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
if c > 0:
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
# finally determine the value to send
refill_amount = 0
if not zero_amount:
refill_amount = self.safe_gas_refill_amount
# determine sender
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.flush()
# set up evm RPC connection
rpc = RPCConnection.connect(chain_spec, 'default')
# set up transaction builder
nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc)
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
c = Gas(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
# build and add transaction
logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address))
(tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED)
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
cache_task = 'cic_eth.eth.gas.cache_gas_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
# add transaction to send queue
s_status = celery.signature(
'cic_eth.queue.state.set_ready',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=queue,
)
t = s_status.apply_async()
return tx_signed_raw_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price.
:param txold_hash_hex: Transaction to re-create
:type txold_hash_hex: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:param gas: Explicitly use the specified gas amount
:type gas: number
:param default_factor: Default factor by which to increment the gas price by
:type default_factor: float
:raises NotLocalTxError: Transaction does not exist in the local queue
:returns: Transaction hash
:rtype: str, 0x-hex
"""
session = SessionBase.create_session()
otx = Otx.load(txold_hash_hex, session)
if otx == None:
session.close()
return a
raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx)
tx = unpack(tx_signed_raw_bytes, chain_spec)
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info.get('routing_key')
logg.debug('before {}'.format(tx))
rpc = RPCConnection.connect(chain_spec, 'default')
new_gas_price = gas
if new_gas_price == None:
o = price()
r = rpc.do(o)
current_gas_price = int(r, 16)
if tx['gasPrice'] > current_gas_price:
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(current_gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
new_gas_price = tx['gasPrice'] + 1
else:
new_gas_price = int(tx['gasPrice'] * default_factor)
#if gas_price > new_gas_price:
# tx['gasPrice'] = gas_price
#else:
# tx['gasPrice'] = new_gas_price
def gas_price(self, category='safe'):
"""Get projected gas price to use for a transaction at the current moment.
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx))
tx['gasPrice'] = new_gas_price
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx)
queue_create(
chain_spec,
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
session=session,
)
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close()
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
tx['from'],
tx['gasPrice'] * tx['gas'],
[tx_hash_hex],
queue=queue,
)
s.apply_async()
return tx_hash_hex
When the category parameter is implemented, it can be used to control the priority of a transaction in the network.
:param category: Bid level category to return price for. Currently has no effect.
:type category: str
:returns: Gas price
:rtype: number
"""
#logg.warning('gas price hardcoded to category "safe"')
#g = 100
#return g
return self.gas_price_current

View File

@@ -0,0 +1,75 @@
# external imports
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status as TxStatus
from cic_eth_registry.erc20 import ERC20Token
# local imports
from cic_eth.ext.address import translate_address
class ExtendedTx:
_default_decimals = 6
def __init__(self, rpc, tx_hash, chain_spec):
self.rpc = rpc
self.chain_spec = chain_spec
self.hash = tx_hash
self.sender = None
self.sender_label = None
self.recipient = None
self.recipient_label = None
self.source_token_value = 0
self.destination_token_value = 0
self.source_token = ZERO_ADDRESS
self.destination_token = ZERO_ADDRESS
self.source_token_symbol = ''
self.destination_token_symbol = ''
self.source_token_decimals = ExtendedTx._default_decimals
self.destination_token_decimals = ExtendedTx._default_decimals
self.status = TxStatus.PENDING.name
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None, caller_address=ZERO_ADDRESS):
self.sender = sender
self.recipient = recipient
if trusted_declarator_addresses != None:
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain_spec, sender_address=caller_address)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain_spec, sender_address=caller_address)
def set_tokens(self, source, source_value, destination=None, destination_value=None):
if destination == None:
destination = source
if destination_value == None:
destination_value = source_value
st = ERC20Token(self.chain_spec, self.rpc, source)
dt = ERC20Token(self.chain_spec, self.rpc, destination)
self.source_token = source
self.source_token_symbol = st.symbol
self.source_token_name = st.name
self.source_token_decimals = st.decimals
self.source_token_value = source_value
self.destination_token = destination
self.destination_token_symbol = dt.symbol
self.destination_token_name = dt.name
self.destination_token_decimals = dt.decimals
self.destination_token_value = destination_value
def set_status(self, n):
if n:
self.status = TxStatus.ERROR.name
else:
self.status = TxStatus.SUCCESS.name
self.status_code = n
def asdict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'asdict', 'rpc']:
continue
o[attr] = getattr(self, attr)
return o

View File

@@ -1,10 +1,24 @@
# standard imports
import logging
# external imports
import celery
from chainlib.eth.address import is_checksum_address
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
class NonceOracle():
celery_app = celery.current_app
logg = logging.getLogger()
class CustodialTaskNonceOracle():
"""Ensures atomic nonce increments for all transactions across all tasks and threads.
:param address: Address to generate nonces for
@@ -12,20 +26,54 @@ class NonceOracle():
:param default_nonce: Initial nonce value to use if no nonce cache entry already exists
:type default_nonce: number
"""
def __init__(self, address, default_nonce):
def __init__(self, address, uuid, session=None):
self.address = address
self.default_nonce = default_nonce
self.uuid = uuid
self.session = session
def next(self):
def get_nonce(self):
return self.next_nonce()
def next_nonce(self):
"""Get next unique nonce.
:returns: Nonce
:rtype: number
"""
raise AttributeError('this should not be called')
return Nonce.next(self.address, self.default_nonce)
r = NonceReservation.release(self.address, self.uuid, session=self.session)
return r[1]
def next_by_task_uuid(self, uuid, session=None):
return NonceReservation.release(uuid, session=session)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, chain_spec_dict, signer_address=None):
self.log_banner()
session = SessionBase.create_session()
address = None
if signer_address == None:
address = chained_input
logg.debug('non-explicit address for reserve nonce, using arg head {}'.format(chained_input))
else:
if is_checksum_address(signer_address):
address = signer_address
logg.debug('explicit address for reserve nonce {}'.format(signer_address))
else:
address = AccountRole.get_address(signer_address, session=session)
logg.debug('role for reserve nonce {} -> {}'.format(signer_address, address))
if not is_checksum_address(address):
raise ValueError('invalid result when resolving address for nonce {}'.format(address))
root_id = self.request.root_id
r = NonceReservation.next(address, root_id, session=session)
logg.debug('nonce {} reserved for address {} task {}'.format(r[1], address, r[0]))
session.commit()
session.close()
return chained_input

View File

@@ -1,39 +0,0 @@
# standard imports
import logging
# local imports
from cic_eth.eth.gas import GasOracle
logg = logging.getLogger()
class RpcClient(GasOracle):
"""RPC wrapper for web3 enabling gas calculation helpers and signer middleware.
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:param holder_address: DEPRECATED Address of subject of the session.
:type holder_address: str, 0x-hex
"""
signer_ipc_path = None
"""Unix socket path to JSONRPC signer and keystore"""
web3_constructor = None
"""Custom function to build a web3 object with middleware plugins"""
def __init__(self, chain_spec, holder_address=None):
(self.provider, w3) = RpcClient.web3_constructor()
super(RpcClient, self).__init__(w3)
self.chain_spec = chain_spec
if holder_address != None:
self.holder_address = holder_address
logg.info('gasprice {}'.format(self.gas_price()))
@staticmethod
def set_constructor(web3_constructor):
"""Sets the constructor to use for building the web3 object.
"""
RpcClient.web3_constructor = web3_constructor

View File

@@ -1,125 +0,0 @@
# standard imports
import logging
# third-party imports
import celery
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.eth import RpcClient
from cic_eth.queue.tx import create as queue_create
celery_app = celery.current_app
logg = celery_app.log.get_default_logger()
@celery_app.task()
def sign_tx(tx, chain_str):
"""Sign a single transaction against the given chain specification.
:param tx: Transaction in standard Ethereum format
:type tx: dict
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and raw signed transaction, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
tx_hash_hex = tx_hash.hex()
return (tx_hash_hex, tx_transfer_signed['raw'],)
def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
:param tx: Standard ethereum transaction data
:type tx: dict
:param chain_str: Chain spec, string representation
:type chain_str: str
:param queue: Task queue
:type queue: str
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
:type cache_task: str
:raises: sqlalchemy.exc.DatabaseError
:returns: Tuple; Transaction hash, signed raw transaction data
:rtype: tuple
"""
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
logg.debug('adding queue tx {}'.format(tx_hash_hex))
queue_create(
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
session=session,
)
if cache_task != None:
logg.debug('adding cache task {} tx {}'.format(cache_task, tx_hash_hex))
s_cache = celery.signature(
cache_task,
[
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
],
queue=queue,
)
s_cache.apply_async()
return (tx_hash_hex, tx_signed_raw_hex,)
# TODO: rename as we will not be sending task in the chain, this is the responsibility of the dispatcher
def create_check_gas_and_send_task(tx_signed_raws_hex, chain_str, holder_address, gas, tx_hashes_hex=None, queue=None):
"""Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher.
If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value.
:param tx_signed_raws_hex: Raw signed transaction data
:type tx_signed_raws_hex: list of str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:param holder_address: Address sending the transactions
:type holder_address: str, 0x-hex
:param gas: Gas budget hint for transactions
:type gas: int
:param tx_hashes_hex: Transaction hashes
:type tx_hashes_hex: list of str, 0x-hex
:param queue: Task queue
:type queue: str
:returns: Signature of task chain
:rtype: celery.Signature
"""
s_check_gas = None
if tx_hashes_hex != None:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
tx_hashes_hex,
chain_str,
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
else:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
chain_str,
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
return s_check_gas

View File

@@ -1,534 +0,0 @@
# standard imports
import logging
# third-party imports
import celery
import requests
import web3
from cic_registry import CICRegistry
from cic_registry import zero_address
from cic_registry.chain import ChainSpec
from hexathon import strip_0x
from chainlib.status import Status as TxStatus
# platform imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.eth import RpcClient
from cic_eth.error import TokenCountError, PermanentTxError, OutOfGasError, NotLocalTxError
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
)
celery_app = celery.current_app
logg = logging.getLogger()
# TODO: fetch from cic-contracts instead when implemented
contract_function_signatures = {
'transfer': 'a9059cbb',
'approve': '095ea7b3',
'transferfrom': '23b872dd',
}
class TokenTxFactory(TxFactory):
"""Factory for creating ERC20 token transactions.
"""
def approve(
self,
token_address,
spender_address,
amount,
chain_spec,
uuid,
session=None,
):
"""Create an ERC20 "approve" transaction
:param token_address: ERC20 contract address
:type token_address: str, 0x-hex
:param spender_address: Address to approve spending for
:type spender_address: str, 0x-hex
:param amount: Amount of tokens to approve
:type amount: int
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "approve" transaction in standard Ethereum format
:rtype: dict
"""
source_token = CICRegistry.get_address(chain_spec, token_address)
source_token_contract = source_token.contract
tx_approve_buildable = source_token_contract.functions.approve(
spender_address,
amount,
)
source_token_gas = source_token.gas('transfer')
tx_approve = tx_approve_buildable.buildTransaction({
'from': self.address,
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(uuid, session=session),
})
return tx_approve
def transfer(
self,
token_address,
receiver_address,
value,
chain_spec,
uuid,
session=None,
):
"""Create an ERC20 "transfer" transaction
:param token_address: ERC20 contract address
:type token_address: str, 0x-hex
:param receiver_address: Address to send tokens to
:type receiver_address: str, 0x-hex
:param amount: Amount of tokens to send
:type amount: int
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "transfer" transaction in standard Ethereum format
:rtype: dict
"""
source_token = CICRegistry.get_address(chain_spec, token_address)
source_token_contract = source_token.contract
transfer_buildable = source_token_contract.functions.transfer(
receiver_address,
value,
)
source_token_gas = source_token.gas('transfer')
tx_transfer = transfer_buildable.buildTransaction(
{
'from': self.address,
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(uuid, session=session),
})
return tx_transfer
def unpack_transfer(data):
"""Verifies that a transaction is an "ERC20.transfer" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['transfer']:
raise ValueError('Invalid transfer data ({})'.format(f))
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
}
def unpack_transferfrom(data):
"""Verifies that a transaction is an "ERC20.transferFrom" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['transferfrom']:
raise ValueError('Invalid transferFrom data ({})'.format(f))
d = data[8:]
return {
'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
'amount': int(d[128:], 16)
}
def unpack_approve(data):
"""Verifies that a transaction is an "ERC20.approve" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['approve']:
raise ValueError('Invalid approval data ({})'.format(f))
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
}
@celery_app.task(base=CriticalWeb3Task)
def balance(tokens, holder_address, chain_str):
"""Return token balances for a list of tokens for given address
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:return: List of balances
:rtype: list of int
"""
#abi = ContractRegistry.abi('ERC20Token')
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
for t in tokens:
#token = CICRegistry.get_address(t['address'])
#abi = token.abi()
#o = c.w3.eth.contract(abi=abi, address=t['address'])
o = CICRegistry.get_address(chain_spec, t['address']).contract
b = o.functions.balanceOf(holder_address).call()
t['balance_network'] = b
return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
"""Transfer ERC20 tokens between addresses
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
# retrieve the token interface
t = tokens[0]
c = RpcClient(chain_spec, holder_address=holder_address)
txf = TokenTxFactory(holder_address, c)
session = SessionBase.create_session()
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session)
session.close()
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def approve(self, tokens, holder_address, spender_address, value, chain_str):
"""Approve ERC20 transfer on behalf of holder address
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
# retrieve the token interface
t = tokens[0]
c = RpcClient(chain_spec, holder_address=holder_address)
txf = TokenTxFactory(holder_address, c)
session = SessionBase.create_session()
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session)
session.close()
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(base=CriticalWeb3Task)
def resolve_tokens_by_symbol(token_symbols, chain_str):
"""Returns contract addresses of an array of ERC20 token symbols
:param token_symbols: Token symbols to resolve
:type token_symbols: list of str
:param chain_str: Chain spec string representation
:type chain_str: str
:return: Respective token contract addresses
:rtype: list of str, 0x-hex
"""
tokens = []
chain_spec = ChainSpec.from_chain_str(chain_str)
for token_symbol in token_symbols:
token = CICRegistry.get_token(chain_spec, token_symbol)
tokens.append({
'address': token.address(),
'converters': [],
})
return tokens
@celery_app.task(base=CriticalSQLAlchemyTask)
def otx_cache_transfer(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an ERC20.transfer or ERC20.transferFrom transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_transfer_data(tx_hash_hex, tx)
return txc
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_transfer_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_transfer
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_transfer(tx['data'])
logg.debug('tx data {}'.format(tx_data))
logg.debug('tx {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx['to'],
tx['to'],
tx_data['amount'],
tx_data['amount'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
def otx_cache_approve(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an ERC20.approve transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_approve_data(tx_hash_hex, tx)
return txc
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_approve_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_approve
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_approve(tx['data'])
logg.debug('tx data {}'.format(tx_data))
logg.debug('tx {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx['to'],
tx['to'],
tx_data['amount'],
tx_data['amount'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
# TODO: Move to dedicated metadata package
class ExtendedTx:
_default_decimals = 6
def __init__(self, tx_hash, chain_spec):
self._chain_spec = chain_spec
self.chain = str(chain_spec)
self.hash = tx_hash
self.sender = None
self.sender_label = None
self.recipient = None
self.recipient_label = None
self.source_token_value = 0
self.destination_token_value = 0
self.source_token = zero_address
self.destination_token = zero_address
self.source_token_symbol = ''
self.destination_token_symbol = ''
self.source_token_decimals = ExtendedTx._default_decimals
self.destination_token_decimals = ExtendedTx._default_decimals
self.status = TxStatus.PENDING.name
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
self.sender = sender
self.recipient = recipient
if trusted_declarator_addresses != None:
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain)
def set_tokens(self, source, source_value, destination=None, destination_value=None):
if destination == None:
destination = source
if destination_value == None:
destination_value = source_value
st = CICRegistry.get_address(self._chain_spec, source)
dt = CICRegistry.get_address(self._chain_spec, destination)
self.source_token = source
self.source_token_symbol = st.symbol()
self.source_token_decimals = st.decimals()
self.source_token_value = source_value
self.destination_token = destination
self.destination_token_symbol = dt.symbol()
self.destination_token_decimals = dt.decimals()
self.destination_token_value = destination_value
def set_status(self, n):
if n:
self.status = TxStatus.ERROR.name
else:
self.status = TxStatus.SUCCESS.name
self.status_code = n
def to_dict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
continue
o[attr] = getattr(self, attr)
return o

View File

@@ -1,41 +1,41 @@
# standard imports
import logging
# third-party imports
# external imports
import celery
import requests
import web3
from cic_registry import zero_address
from cic_registry.chain import ChainSpec
from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
from chainlib.eth.error import NotFoundEthException
from chainlib.eth.tx import (
transaction,
receipt,
raw,
)
from chainlib.connection import RPCConnection
from chainlib.hash import keccak256_hex_to_hex
from hexathon import (
add_0x,
strip_0x,
)
from chainqueue.db.models.tx import Otx
from chainqueue.db.models.tx import TxCache
from chainqueue.db.enum import StatusBits
from chainqueue.error import NotLocalTxError
# local imports
from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation
from cic_eth.db.models.lock import Lock
from cic_eth.db.enum import (
LockEnum,
StatusBits,
from cic_eth.db import SessionBase
from cic_eth.error import (
PermanentTxError,
TemporaryTxError,
)
from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError
from cic_eth.error import NotLocalTxError
from cic_eth.queue.tx import create as queue_create
from cic_eth.queue.tx import get_tx
from cic_eth.queue.tx import get_nonce_tx
from cic_eth.error import OutOfGasError
from cic_eth.error import LockedError
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import sign_and_register_tx, create_check_gas_and_send_task
from cic_eth.eth.task import sign_tx
from cic_eth.eth.nonce import NonceOracle
from cic_eth.error import AlreadyFillingGasError
from cic_eth.eth.util import tx_hex_string
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.admin.ctrl import lock_send
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalWeb3AndSignerTask,
CriticalSQLAlchemyAndSignerTask,
CriticalSQLAlchemyAndWeb3Task,
)
celery_app = celery.current_app
@@ -44,112 +44,6 @@ logg = logging.getLogger()
MAX_NONCE_ATTEMPTS = 3
# TODO this function is too long
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyTask)
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
If account balance is sufficient, but level of gas before spend is below "safe" threshold, gas refill is requested, and execution continues normally.
:param tx_hashes: Transaction hashes due to be submitted
:type tx_hashes: list of str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:param txs: Signed raw transaction data, corresponding to tx_hashes
:type txs: list of str, 0x-hex
:param address: Sender address
:type address: str, 0x-hex
:param gas_required: Gas limit * gas price for transaction, (optional, if not set will be retrived from transaction data)
:type gas_required: int
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
if len(txs) == 0:
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
if address == None:
address = o['address']
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
#c = RpcClient(chain_spec, holder_address=address)
c = RpcClient(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
balance = c.w3.eth.getBalance(address)
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
if gas_required > balance:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_str,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.tx.set_waitforgas',
[
tx_hash,
],
queue=queue,
)
wait_tasks.append(s)
celery.group(wait_tasks)()
raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, balance))
safe_gas = c.safe_threshold_amount()
if balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_str,
],
queue=queue,
)
s_nonce.link(s_refill)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
ready_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.tx.set_ready',
[
tx_hash,
],
queue=queue,
)
ready_tasks.append(s)
celery.group(ready_tasks)()
return txs
# TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead.
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def hashes_to_txs(self, tx_hashes):
@@ -166,8 +60,6 @@ def hashes_to_txs(self, tx_hashes):
queue = self.request.delivery_info['routing_key']
#otxs = ','.format("'{}'".format(tx_hash) for tx_hash in tx_hashes)
session = SessionBase.create_session()
q = session.query(Otx.signed_tx)
q = q.filter(Otx.tx_hash.in_(tx_hashes))
@@ -184,172 +76,9 @@ def hashes_to_txs(self, tx_hashes):
return txs
# TODO: Move this and send to subfolder submodule
class ParityNodeHandler:
def __init__(self, chain_spec, queue):
self.chain_spec = chain_spec
self.chain_str = str(chain_spec)
self.queue = queue
def handle(self, exception, tx_hash_hex, tx_hex):
meth = self.handle_default
if isinstance(exception, (ValueError)):
earg = exception.args[0]
if earg['code'] == -32010:
logg.debug('skipping lock for code {}'.format(earg['code']))
meth = self.handle_invalid_parameters
elif earg['code'] == -32602:
meth = self.handle_invalid_encoding
else:
# TODO: move to status log db comment field
meth = self.handle_invalid
elif isinstance(exception, (requests.exceptions.ConnectionError)):
meth = self.handle_connection
(t, e_fn, message) = meth(tx_hash_hex, tx_hex, str(exception))
return (t, e_fn, '{} {}'.format(message, exception))
def handle_connection(self, tx_hash_hex, tx_hex, debugstr=None):
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
[
tx_hash_hex,
True,
],
queue=self.queue,
)
t = s_set_sent.apply_async()
return (t, TemporaryTxError, 'Sendfail {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_encoding(self, tx_hash_hex, tx_hex, debugstr=None):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
'cic_eth.admin.ctrl.lock_send',
[
tx_hash_hex,
self.chain_str,
tx['from'],
tx_hash_hex,
],
queue=self.queue,
)
s_set_reject = celery.signature(
'cic_eth.queue.tx.set_rejected',
[],
queue=self.queue,
)
nonce_txs = get_nonce_tx(tx['nonce'], tx['from'], self.chain_spec.chain_id())
attempts = len(nonce_txs)
if attempts < MAX_NONCE_ATTEMPTS:
logg.debug('nonce {} address {} retries {} < {}'.format(tx['nonce'], tx['from'], attempts, MAX_NONCE_ATTEMPTS))
s_resend = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
self.chain_str,
None,
1.01,
],
queue=self.queue,
)
s_unlock = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
self.chain_str,
tx['from'],
],
queue=self.queue,
)
s_resend.link(s_unlock)
s_set_reject.link(s_resend)
s_lock.link(s_set_reject)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Reject invalid encoding {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_parameters(self, tx_hash_hex, tx_hex, debugstr=None):
s_sync = celery.signature(
'cic_eth.eth.tx.sync_tx',
[
tx_hash_hex,
self.chain_str,
],
queue=self.queue,
)
t = s_sync.apply_async()
return (t, PermanentTxError, 'Reject invalid parameters {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid(self, tx_hash_hex, tx_hex, debugstr=None):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
'cic_eth.admin.ctrl.lock_send',
[
tx_hash_hex,
self.chain_str,
tx['from'],
tx_hash_hex,
],
queue=self.queue,
)
s_set_reject = celery.signature(
'cic_eth.queue.tx.set_rejected',
[],
queue=self.queue,
)
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
debugstr,
],
queue=self.queue,
)
s_set_reject.link(s_debug)
s_lock.link(s_set_reject)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_default(self, tx_hash_hex, tx_hex, debugstr):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
'cic_eth.admin.ctrl.lock_send',
[
tx_hash_hex,
self.chain_str,
tx['from'],
tx_hash_hex,
],
queue=self.queue,
)
s_set_fubar = celery.signature(
'cic_eth.queue.tx.set_fubar',
[],
queue=self.queue,
)
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
debugstr,
],
queue=self.queue,
)
s_set_fubar.link(s_debug)
s_lock.link(s_set_fubar)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr))
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
@celery_app.task(bind=True, base=CriticalWeb3Task)
def send(self, txs, chain_str):
def send(self, txs, chain_spec_dict):
"""Send transactions to the network.
If more than one transaction is passed to the task, it will spawn a new send task with the remaining transaction(s) after the first in the list has been processed.
@@ -374,236 +103,51 @@ def send(self, txs, chain_str):
if len(txs) == 0:
raise ValueError('no transaction to send')
chain_spec = ChainSpec.from_chain_str(chain_str)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_hex = txs[0]
logg.debug('send transaction {}'.format(tx_hex))
tx_hex = add_0x(txs[0])
tx_hash = web3.Web3.keccak(hexstr=tx_hex)
tx_hash_hex = tx_hash.hex()
tx_hash_hex = add_0x(keccak256_hex_to_hex(tx_hex))
queue = self.request.delivery_info.get('routing_key', None)
logg.debug('send transaction {} -> {}'.format(tx_hash_hex, tx_hex))
queue = self.request.delivery_info.get('routing_key')
c = RpcClient(chain_spec)
r = None
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
'cic_eth.queue.state.set_sent',
[
chain_spec_dict,
tx_hash_hex,
False
],
queue=queue,
)
try:
r = c.w3.eth.send_raw_transaction(tx_hex)
except requests.exceptions.ConnectionError as e:
raise(e)
except Exception as e:
raiser = ParityNodeHandler(chain_spec, queue)
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
raise e(m)
o = raw(tx_hex)
conn = RPCConnection.connect(chain_spec, 'default')
conn.do(o)
s_set_sent.apply_async()
tx_tail = txs[1:]
if len(tx_tail) > 0:
s = celery.signature(
'cic_eth.eth.tx.send',
[tx_tail],
[
tx_tail,
chain_spec_dict,
],
queue=queue,
)
s.apply_async()
return r.hex()
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
def refill_gas(self, recipient_address, chain_str):
"""Executes a native token transaction to fund the recipient's gas expenditures.
:param recipient_address: Recipient in need of gas
:type recipient_address: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:raises AlreadyFillingGasError: A gas refill transaction for this address is already executing
:returns: Transaction hash.
:rtype: str, 0x-hex
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
if c > 0:
#session.close()
#raise AlreadyFillingGasError(recipient_address)
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec)
clogg = celery_app.log.get_default_logger()
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
#nonce = nonce_generator.next(session=session)
nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session)
gas_price = c.gas_price()
gas_limit = c.default_gas_limit
refill_amount = 0
if not zero_amount:
refill_amount = c.refill_amount()
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
# create and sign transaction
tx_send_gas = {
'from': c.gas_provider(),
'to': recipient_address,
'gas': gas_limit,
'gasPrice': gas_price,
'chainId': chain_spec.chain_id(),
'nonce': nonce,
'value': refill_amount,
'data': '',
}
tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas)
tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw'])
tx_hash_hex = tx_hash.hex()
# TODO: route this through sign_and_register_tx instead
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
queue_create(
nonce,
c.gas_provider(),
tx_hash_hex,
tx_send_gas_signed['raw'],
chain_str,
session=session,
)
session.close()
s_tx_cache = celery.signature(
'cic_eth.eth.tx.cache_gas_refill_data',
[
tx_hash_hex,
tx_send_gas,
],
queue=queue,
)
s_status = celery.signature(
'cic_eth.queue.tx.set_ready',
[
tx_hash_hex,
],
queue=queue,
)
celery.group(s_tx_cache, s_status)()
return tx_send_gas_signed['raw']
@celery_app.task(bind=True)
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price.
:param txold_hash_hex: Transaction to re-create
:type txold_hash_hex: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:param gas: Explicitly use the specified gas amount
:type gas: number
:param default_factor: Default factor by which to increment the gas price by
:type default_factor: float
:raises NotLocalTxError: Transaction does not exist in the local queue
:returns: Transaction hash
:rtype: str, 0x-hex
"""
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==txold_hash_hex)
otx = q.first()
if otx == None:
session.close()
raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info['routing_key']
logg.debug('before {}'.format(tx))
if gas != None:
tx['gasPrice'] = gas
else:
gas_price = c.gas_price()
if tx['gasPrice'] > gas_price:
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
tx['gasPrice'] += 1
else:
new_gas_price = int(tx['gasPrice'] * default_factor)
if gas_price > new_gas_price:
tx['gasPrice'] = gas_price
else:
tx['gasPrice'] = new_gas_price
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
queue_create(
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
session=session,
)
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close()
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
tx['from'],
tx['gasPrice'] * tx['gas'],
[tx_hash_hex],
queue=queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, address=None):
session = SessionBase.create_session()
if address == None:
address = chained_input
root_id = self.request.root_id
nonce = NonceReservation.next(address, root_id)
session.close()
return chained_input
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_str):
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_spec_dict):
"""Force update of network status of a simgle transaction
:param tx_hash_hex: Transaction hash
@@ -612,27 +156,32 @@ def sync_tx(self, tx_hash_hex, chain_str):
:type chain_str: str
"""
queue = self.request.delivery_info['routing_key']
queue = self.request.delivery_info.get('routing_key')
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
conn = RPCConnection.connect(chain_spec, 'default')
o = transaction(tx_hash_hex)
tx = conn.do(o)
tx = c.w3.eth.getTransaction(tx_hash_hex)
rcpt = None
try:
rcpt = c.w3.eth.getTransactionReceipt(tx_hash_hex)
except web3.exceptions.TransactionNotFound as e:
o = receipt(tx_hash_hex)
rcpt = conn.do(o)
except NotFoundEthException as e:
pass
# TODO: apply receipt in tx object to validate and normalize input
if rcpt != None:
success = rcpt['status'] == 1
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
s = celery.signature(
'cic_eth.queue.tx.set_final_status',
'cic_eth.queue.state.set_final',
[
tx_hash_hex,
rcpt['blockNumber'],
rcpt['transactionIndex'],
not success,
],
queue=queue,
@@ -641,7 +190,7 @@ def sync_tx(self, tx_hash_hex, chain_str):
logg.debug('sync tx {} mempool'.format(tx_hash_hex))
s = celery.signature(
'cic_eth.queue.tx.set_sent_status',
'cic_eth.queue.state.set_sent',
[
tx_hash_hex,
],
@@ -651,102 +200,45 @@ def sync_tx(self, tx_hash_hex, chain_str):
s.apply_async()
@celery_app.task(bind=True)
def resume_tx(self, txpending_hash_hex, chain_str):
"""Queue a suspended tranaction for (re)sending
:param txpending_hash_hex: Transaction hash
:type txpending_hash_hex: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:raises NotLocalTxError: Transaction does not exist in the local queue
:returns: Transaction hash
:rtype: str, 0x-hex
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session()
q = session.query(Otx.signed_tx)
q = q.filter(Otx.tx_hash==txpending_hash_hex)
r = q.first()
session.close()
if r == None:
raise NotLocalTxError(txpending_hash_hex)
tx_signed_raw_hex = r[0]
tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_bytes, chain_spec.chain_id())
queue = self.request.delivery_info['routing_key']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
tx['from'],
tx['gasPrice'] * tx['gas'],
[txpending_hash_hex],
queue=queue,
)
s.apply_async()
return txpending_hash_hex
#
#@celery_app.task(bind=True)
#def resume_tx(self, txpending_hash_hex, chain_str):
# """Queue a suspended tranaction for (re)sending
#
# :param txpending_hash_hex: Transaction hash
# :type txpending_hash_hex: str, 0x-hex
# :param chain_str: Chain spec, string representation
# :type chain_str: str
# :raises NotLocalTxError: Transaction does not exist in the local queue
# :returns: Transaction hash
# :rtype: str, 0x-hex
# """
#
# chain_spec = ChainSpec.from_chain_str(chain_str)
#
# session = SessionBase.create_session()
# q = session.query(Otx.signed_tx)
# q = q.filter(Otx.tx_hash==txpending_hash_hex)
# r = q.first()
# session.close()
# if r == None:
# raise NotLocalTxError(txpending_hash_hex)
#
# tx_signed_raw_hex = r[0]
# tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
# tx = unpack(tx_signed_bytes, chain_spec)
#
# queue = self.request.delivery_info['routing_key']
#
# s = create_check_gas_and_send_task(
# [tx_signed_raw_hex],
# chain_str,
# tx['from'],
# tx['gasPrice'] * tx['gas'],
# [txpending_hash_hex],
# queue=queue,
# )
# s.apply_async()
# return txpending_hash_hex
@celery_app.task(base=CriticalSQLAlchemyTask)
def otx_cache_parse_tx(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for a gas refill transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_gas_refill_data(tx_hash_hex, tx)
return txc
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gas_refill_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_parse_tx
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
zero_address,
zero_address,
tx['value'],
tx['value'],
)
session = SessionBase.create_session()
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -1,110 +0,0 @@
# standard imports
import logging
import sha3
import web3
# external imports
from rlp import decode as rlp_decode
from rlp import encode as rlp_encode
from eth_keys import KeyAPI
from chainlib.eth.tx import unpack
logg = logging.getLogger()
field_debugs = [
'nonce',
'gasPrice',
'gas',
'to',
'value',
'data',
'v',
'r',
's',
]
unpack_signed_raw_tx = unpack
#def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
# d = rlp_decode(tx_raw_bytes)
#
# logg.debug('decoding {} using chain id {}'.format(tx_raw_bytes.hex(), chain_id))
# j = 0
# for i in d:
# logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
# j += 1
# vb = chain_id
# if chain_id != 0:
# v = int.from_bytes(d[6], 'big')
# vb = v - (chain_id * 2) - 35
# while len(d[7]) < 32:
# d[7] = b'\x00' + d[7]
# while len(d[8]) < 32:
# d[8] = b'\x00' + d[8]
# s = b''.join([d[7], d[8], bytes([vb])])
# so = KeyAPI.Signature(signature_bytes=s)
#
# h = sha3.keccak_256()
# h.update(rlp_encode(d))
# signed_hash = h.digest()
#
# d[6] = chain_id
# d[7] = b''
# d[8] = b''
#
# h = sha3.keccak_256()
# h.update(rlp_encode(d))
# unsigned_hash = h.digest()
#
# p = so.recover_public_key_from_msg_hash(unsigned_hash)
# a = p.to_checksum_address()
# logg.debug('decoded recovery byte {}'.format(vb))
# logg.debug('decoded address {}'.format(a))
# logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
# logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
#
# to = d[3].hex() or None
# if to != None:
# to = web3.Web3.toChecksumAddress('0x' + to)
#
# return {
# 'from': a,
# 'nonce': int.from_bytes(d[0], 'big'),
# 'gasPrice': int.from_bytes(d[1], 'big'),
# 'gas': int.from_bytes(d[2], 'big'),
# 'to': to,
# 'value': int.from_bytes(d[4], 'big'),
# 'data': '0x' + d[5].hex(),
# 'v': chain_id,
# 'r': '0x' + s[:32].hex(),
# 's': '0x' + s[32:64].hex(),
# 'chainId': chain_id,
# 'hash': '0x' + signed_hash.hex(),
# 'hash_unsigned': '0x' + unsigned_hash.hex(),
# }
def unpack_signed_raw_tx_hex(tx_raw_hex, chain_id):
return unpack_signed_raw_tx(bytes.fromhex(tx_raw_hex[2:]), chain_id)
# TODO: consider moving tx string representation generation from api_admin to here
def tx_string(tx_raw_bytes, chain_id):
tx_unpacked = unpack_signed_raw_tx(tx_raw_bytes, chain_id)
return 'tx nonce {} from {} to {} hash {}'.format(
tx_unpacked['nonce'],
tx_unpacked['from'],
tx_unpacked['to'],
tx_unpacked['hash'],
)
def tx_hex_string(tx_hex, chain_id):
if len(tx_hex) < 2:
raise ValueError('invalid data length')
elif tx_hex[:2] == '0x':
tx_hex = tx_hex[2:]
tx_raw_bytes = bytes.fromhex(tx_hex)
return tx_string(tx_raw_bytes, chain_id)

View File

@@ -3,20 +3,34 @@ import logging
# third-party imports
import celery
from cic_registry.chain import ChainSpec
from cic_registry import CICRegistry
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth_registry import CICRegistry
from eth_address_declarator import AddressDeclarator
# local imports
from cic_eth.task import BaseTask
celery_app = celery.current_app
logg = logging.getLogger()
def translate_address(address, trusted_addresses, chain_spec):
def translate_address(address, trusted_addresses, chain_spec, sender_address=ZERO_ADDRESS):
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
c = AddressDeclarator(chain_spec)
for trusted_address in trusted_addresses:
o = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
fn = o.function('declaration')
declaration_hex = fn(trusted_address, address).call()
declaration_bytes = declaration_hex[0].rstrip(b'\x00')
o = c.declaration(declarator_address, trusted_address, address, sender_address=sender_address)
r = rpc.do(o)
declaration_hex = AddressDeclarator.parse_declaration(r)
declaration_hex = declaration_hex[0].rstrip('0')
declaration_bytes = bytes.fromhex(declaration_hex)
declaration = None
try:
declaration = declaration_bytes.decode('utf-8', errors='strict')
@@ -25,19 +39,19 @@ def translate_address(address, trusted_addresses, chain_spec):
return declaration
@celery_app.task()
def translate_tx_addresses(tx, trusted_addresses, chain_str):
@celery_app.task(bind=True, base=BaseTask)
def translate_tx_addresses(self, tx, trusted_addresses, chain_spec_dict):
chain_spec = ChainSpec.from_chain_str(chain_str)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
declaration = None
if tx['sender_label'] == None:
declaration = translate_address(tx['sender'], trusted_addresses, chain_spec)
declaration = translate_address(tx['sender'], trusted_addresses, chain_spec, self.call_address)
tx['sender_label'] = declaration
declaration = None
if tx['recipient_label'] == None:
declaration = translate_address(tx['recipient'], trusted_addresses, chain_spec)
declaration = translate_address(tx['recipient'], trusted_addresses, chain_spec, self.call_address)
tx['recipient_label'] = declaration
return tx

Some files were not shown because too many files have changed in this diff Show More