Compare commits

..

86 Commits

Author SHA1 Message Date
nolash
d3240a5063 replace requirements base for cache, notify 2021-03-29 15:17:32 +02:00
nolash
309392511a Downgrade correct cic-eth increment version 2021-03-29 14:37:35 +02:00
nolash
7684cd6aa1 Update cic-eth deps 2021-03-29 14:36:27 +02:00
nolash
059191346e Rehabilitate migration scripts 2021-03-29 14:35:02 +02:00
nolash
1047a1a487 Node atomicity rehabilitated 2021-03-29 11:35:41 +02:00
nolash
bd9070ae44 Parse receipt value correctly 2021-03-29 09:55:57 +02:00
nolash
02d23112cd Bump faucet version 2021-03-29 02:21:07 +02:00
nolash
792715ca8c Upgrade facucet 2021-03-28 22:54:14 +02:00
nolash
e3846809a6 WIP add writer registration to seed script 2021-03-28 21:29:05 +02:00
nolash
1322ebc8b6 WIP rehabilitate admin api runnables 2021-03-28 18:32:37 +02:00
nolash
f35bbc84e4 Complete rehabilitation of RPC signing 2021-03-27 16:35:18 +01:00
nolash
47c9f16806 Add dedicated rpc signer connection 2021-03-27 15:49:22 +01:00
nolash
da9d27606e Sighhhhhhhh 2021-03-26 18:07:58 +01:00
nolash
e72fe5d61b Absolutely fucking sigh 2021-03-26 17:34:29 +01:00
nolash
4edf7d1e82 Sighsighsighsighsighsigh 2021-03-26 16:16:12 +01:00
nolash
80c76f2f2e sigh sigh sigh 2021-03-26 15:33:51 +01:00
nolash
5cc1526c7c Sigh 2021-03-26 15:27:46 +01:00
nolash
e2913dd37d Upgrade base 2021-03-26 15:13:01 +01:00
nolash
9542b0544b Bump faucet version 2021-03-26 15:02:02 +01:00
nolash
44c1ca9573 Upgrade faucet, base without faucet 2021-03-26 14:02:49 +01:00
nolash
11f86d6409 Correct tag for account registry 2021-03-26 13:38:46 +01:00
nolash
35ca98c65b Correct role name in create account
Signed-off-by: nolash <dev@holbrook.no>
2021-03-26 10:00:34 +01:00
nolash
3a17647107 Rehabilitate tracker daemon frontend 2021-03-26 08:39:33 +01:00
nolash
101a1eb050 WIP rehabilitate sync tx 2021-03-26 07:57:35 +01:00
nolash
e166f3d101 Add basic send test 2021-03-25 20:03:29 +01:00
nolash
7d5d7f67c6 Update cic-eth version in contract migration dockerfile 2021-03-25 19:24:46 +01:00
nolash
21f71043c9 WIP rehabilitate view cli 2021-03-25 19:23:41 +01:00
nolash
4ad2f18fb1 Rehabilitate lock cli 2021-03-25 16:46:46 +01:00
nolash
a903d12bf0 Rehabilitate admin api tests (except nonce manip) 2021-03-25 16:40:33 +01:00
nolash
72eb61b1c2 Seed script rehabilitated with chainlib 2021-03-25 10:34:53 +01:00
nolash
5bf30afb07 WIP Rehabilitate seed scrip 2021-03-25 09:34:57 +01:00
nolash
8675b1a215 Add chain spec to tag invocations 2021-03-24 19:51:01 +01:00
nolash
b93aa82bf7 Make account setter use celery 2021-03-24 17:57:18 +01:00
nolash
e64b1bf984 Rehabilitate cic-eth-create step 2021-03-24 15:52:52 +01:00
nolash
585ad07c6e Merge remote-tracking branch 'origin/master' into lash/migration-last-gasp 2021-03-24 13:12:56 +01:00
nolash
e138a0005a Rehabilitate reset script after chainlib refactors 2021-03-24 13:08:54 +01:00
nolash
97ecce96a8 Bump deps and version 2021-03-24 10:21:25 +01:00
nolash
b441abb004 Update reset to use new deploy scripts 2021-03-23 17:15:48 +01:00
nolash
353abaa151 Merge upstream contract migration 2021-03-23 17:15:27 +01:00
nolash
d892caa288 Add rpc disconnect 2021-03-23 00:13:57 +01:00
nolash
1bc08295ef WIP rehabilitate tracker daemon 2021-03-22 23:36:34 +01:00
nolash
0d67f6efba WIP rehabilitate cic tasker daemon 2021-03-22 22:57:36 +01:00
nolash
305a1f760b WIP Rehabilitate runnables 2021-03-22 22:10:52 +01:00
nolash
05f7b3c9c3 Rehabilitate bloom filter tx list test 2021-03-22 20:31:03 +01:00
nolash
431ee11f4f Rehabilitate address translate test 2021-03-22 20:10:20 +01:00
nolash
0ab8675d19 Rehabilitate transaction list api and test 2021-03-21 21:10:56 +01:00
nolash
267ce84caa Rehabilitate balance api 2021-03-21 19:39:38 +01:00
nolash
333d410b1c Bump version 2021-03-21 14:04:54 +01:00
nolash
e5223e6195 Rename token registration fixture 2021-03-21 14:04:14 +01:00
nolash
453dd47091 Rehabilitate api tests 2021-03-21 12:18:15 +01:00
nolash
65bb4cf66f Reinstate api callback test 2021-03-20 23:00:15 +01:00
nolash
06ddfb4fe8 Reinstate basic tx test 2021-03-20 22:58:48 +01:00
nolash
2eaaedb0f0 Rehabilitate db unit tests 2021-03-20 22:40:06 +01:00
nolash
964952e904 Rehabilitate queue tests 2021-03-20 21:10:19 +01:00
nolash
788aa9d7b9 WIP Rehabilitate queue tests 2021-03-20 16:44:07 +01:00
nolash
bf832afb87 Rehabilitate list tx and balances test 2021-03-20 14:35:35 +01:00
nolash
3f61a2007e Rehabilitate transfer, approve
Signed-off-by: nolash <dev@holbrook.no>
2021-03-20 13:58:45 +01:00
nolash
c245a29a6b Rehabilitate erc20 balance task 2021-03-19 19:51:30 +01:00
nolash
871cbdcaeb Account register test passes 2021-03-18 19:36:52 +01:00
nolash
2f07ea1395 WIP refactor send, checkgas, refillgas to use chainlib 2021-03-18 11:57:26 +01:00
nolash
9f2773e948 Change from rlp to simple-rlp 2021-03-17 17:22:17 +01:00
nolash
41731b5e96 WIP Add role fixture, rehabilitate account register task, test 2021-03-17 11:24:55 +01:00
nolash
318615751c WIP rehabilitate register account test and task 2021-03-16 17:16:06 +01:00
nolash
6a6b6b59d8 Assimilate agnostic rpc in base tests 2021-03-16 15:35:22 +01:00
nolash
be3a2e7b2d Port new fixtures, registry setup in tests 2021-03-15 19:46:37 +01:00
nolash
a6fd20124c WIP receive new registry 2021-03-13 21:31:37 +01:00
nolash
e20a099570 WIP Remove web3 from tests 2021-03-12 18:44:29 +01:00
nolash
958bd9af96 Move signer from middleware to dedicated connection 2021-03-12 09:36:57 +01:00
nolash
10835979bc Use chainlib directly for signing 2021-03-11 11:40:30 +01:00
nolash
94c8fd6cd6 Revert back to subsession handling of nonce 2021-03-09 07:43:31 +01:00
nolash
2fe205b1e8 Provide web3 constructor to tracker rpc client 2021-03-08 15:51:45 +01:00
nolash
b37c7f3cc2 Provide thread safe registry solution 2021-03-08 10:11:04 +01:00
nolash
abc9877726 Merge branch 'master' into lash/migration-last-gasp 2021-03-07 20:35:43 +01:00
nolash
6d6e2a29b4 Fix refill gas task typo, include missing eth error symbol 2021-03-07 20:10:49 +01:00
nolash
b9d7cc20f7 Merge remote-tracking branch 'origin/master' into lash/cli-rehabilitations 2021-03-07 19:54:32 +01:00
nolash
cf468fc4c7 Add custom eth error 2021-03-07 19:01:16 +01:00
nolash
69a13235ba Handle occasional incompatible phone number 2021-03-07 15:42:09 +01:00
nolash
84a20a6743 Merge remote-tracking branch 'origin/master' into lash/cli-rehabilitations 2021-03-07 14:52:11 +01:00
nolash
67330b2ad5 Make lock tasks critical db tasks 2021-03-07 13:33:26 +01:00
nolash
16ea3f3db8 Bump version 2021-03-07 12:53:09 +01:00
nolash
2b354a1029 Add role tag option to reserve nonce 2021-03-07 12:52:48 +01:00
nolash
5f0822598f Bump cic base 2021-03-07 10:37:27 +01:00
nolash
05479a6576 WIP remove lower layer deps in ussd, correct create account api task graph 2021-03-07 10:33:11 +01:00
nolash
1c0732d983 Upgrade notify 2021-03-07 07:45:13 +01:00
nolash
fd263e7648 Merge remote-tracking branch 'origin/master' into lash/browser-emulator-ussd 2021-03-07 07:24:42 +01:00
nolash
2e09b69e36 Add configurable host, port, ssl scheme to ussd browser emulator 2021-03-06 23:50:43 +01:00
333 changed files with 9013 additions and 9747 deletions

8
.gitignore vendored
View File

@@ -1,10 +1,2 @@
service-configs/*
!service-configs/.gitkeep
**/node_modules/
__pycache__
*.pyc
*.o
gmon.out
*.egg-info
dist/
build/

View File

@@ -6,7 +6,6 @@ include:
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
- local: 'apps/contract-migration/scripts/.gitlab-ci.yml'
stages:
- build

View File

@@ -6,4 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=

View File

@@ -6,4 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=

View File

@@ -2,14 +2,9 @@
import logging
# local imports
from .list import (
list_transactions_mined,
list_transactions_account_mined,
add_transaction,
tag_transaction,
add_tag,
)
from .list import list_transactions_mined
from .list import list_transactions_account_mined
from .list import add_transaction
logg = logging.getLogger()

View File

@@ -2,9 +2,8 @@
import logging
import datetime
# external imports
# third-party imports
from cic_cache.db.models.base import SessionBase
from sqlalchemy import text
logg = logging.getLogger()
@@ -51,8 +50,7 @@ def list_transactions_account_mined(
def add_transaction(
session,
tx_hash,
session, tx_hash,
block_number,
tx_index,
sender,
@@ -64,33 +62,6 @@ def add_transaction(
success,
timestamp,
):
"""Adds a single transaction to the cache persistent storage. Sensible interpretation of all fields is the responsibility of the caller.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param block_number: Block number
:type block_number: int
:param tx_index: Transaction index in block
:type tx_index: int
:param sender: Ethereum address of effective sender
:type sender: str, 0x-hex
:param receiver: Ethereum address of effective recipient
:type receiver: str, 0x-hex
:param source_token: Ethereum address of token used by sender
:type source_token: str, 0x-hex
:param destination_token: Ethereum address of token received by recipient
:type destination_token: str, 0x-hex
:param from_value: Source token value spent in transaction
:type from_value: int
:param to_value: Destination token value received in transaction
:type to_value: int
:param success: True if code execution on network was successful
:type success: bool
:param date_block: Block timestamp
:type date_block: datetime
"""
date_block = datetime.datetime.fromtimestamp(timestamp)
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
tx_hash,
@@ -106,74 +77,3 @@ def add_transaction(
date_block,
)
session.execute(s)
def tag_transaction(
session,
tx_hash,
name,
domain=None,
):
"""Tag a single transaction with a single tag.
Tag must already exist in storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises ValueError: Unknown tag or transaction hash
"""
s = text("SELECT id from tx where tx_hash = :a")
r = session.execute(s, {'a': tx_hash}).fetchall()
tx_id = r[0].values()[0]
if tx_id == None:
raise ValueError('unknown tx hash {}'.format(tx_hash))
#s = text("SELECT id from tag where value = :a and domain = :b")
if domain == None:
s = text("SELECT id from tag where value = :a")
else:
s = text("SELECT id from tag where value = :a and domain = :b")
r = session.execute(s, {'a': name, 'b': domain}).fetchall()
tag_id = r[0].values()[0]
logg.debug('type {} {}'.format(type(tag_id), type(tx_id)))
if tag_id == None:
raise ValueError('unknown tag name {} domain {}'.format(name, domain))
s = text("INSERT INTO tag_tx_link (tag_id, tx_id) VALUES (:a, :b)")
r = session.execute(s, {'a': int(tag_id), 'b': int(tx_id)})
def add_tag(
session,
name,
domain=None,
):
"""Add a single tag to storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises sqlalchemy.exc.IntegrityError: Tag already exists
"""
s = None
if domain == None:
s = text("INSERT INTO tag (value) VALUES (:b)")
else:
s = text("INSERT INTO tag (domain, value) VALUES (:a, :b)")
session.execute(s, {'a': domain, 'b': name})

View File

@@ -1,38 +0,0 @@
"""Transaction tags
Revision ID: aaf2bdce7d6e
Revises: 6604de4203e2
Create Date: 2021-05-01 09:20:20.775082
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'aaf2bdce7d6e'
down_revision = '6604de4203e2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tag',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('domain', sa.String(), nullable=True),
sa.Column('value', sa.String(), nullable=False),
)
op.create_index('idx_tag_domain_value', 'tag', ['domain', 'value'], unique=True)
op.create_table(
'tag_tx_link',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag_id', sa.Integer, sa.ForeignKey('tag.id'), nullable=False),
sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=False),
)
def downgrade():
op.drop_table('tag_tx_link')
op.drop_index('idx_tag_domain_value')
op.drop_table('tag')

View File

@@ -1 +0,0 @@
from .erc20 import *

View File

@@ -1,27 +0,0 @@
class TagSyncFilter:
"""Holds tag name and domain for an implementing filter.
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
"""
def __init__(self, name, domain=None):
self.tag_name = name
self.tag_domain = domain
def tag(self):
"""Return tag value/domain.
:rtype: Tuple
:returns: tag value/domain.
"""
return (self.tag_name, self.tag_domain)
def __str__(self):
if self.tag_domain == None:
return self.tag_name
return '{}.{}'.format(self.tag_domain, self.tag_name)

View File

@@ -1,83 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.eth.address import (
to_checksum_address,
)
from chainlib.eth.error import RequestMismatchException
from chainlib.status import Status
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import (
NotAContractError,
ContractMismatchError,
)
from eth_erc20 import ERC20
# local imports
from .base import TagSyncFilter
from cic_cache import db as cic_cache_db
logg = logging.getLogger().getChild(__name__)
class ERC20TransferFilter(TagSyncFilter):
def __init__(self, chain_spec):
super(ERC20TransferFilter, self).__init__('transfer', domain='erc20')
self.chain_spec = chain_spec
# TODO: Verify token in declarator / token index
def filter(self, conn, block, tx, db_session=None):
logg.debug('filter {} {}'.format(block, tx))
token = None
try:
token = ERC20Token(self.chain_spec, conn, tx.inputs[0])
except NotAContractError:
logg.debug('not a contract {}'.format(tx.inputs[0]))
return False
except ContractMismatchError:
logg.debug('not an erc20 token {}'.format(tx.inputs[0]))
return False
transfer_data = None
try:
transfer_data = ERC20.parse_transfer_request(tx.payload)
except RequestMismatchException:
logg.debug('erc20 match but not a transfer, skipping')
return False
except ValueError:
logg.debug('erc20 match but bogus data, skipping')
return False
token_sender = tx.outputs[0]
token_recipient = transfer_data[0]
token_value = transfer_data[1]
logg.debug('matched erc20 token transfer {} ({}) to {} value {}'.format(token.name, token.address, transfer_data[0], transfer_data[1]))
cic_cache_db.add_transaction(
db_session,
tx.hash,
block.number,
tx.index,
to_checksum_address(token_sender),
to_checksum_address(token_recipient),
token.address,
token.address,
token_value,
token_value,
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
db_session.commit()
return True

View File

@@ -1,136 +0,0 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# external imports
import confini
import celery
import sqlalchemy
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.connection import RPCConnection
from chainlib.eth.block import (
block_latest,
)
from hexathon import (
strip_0x,
)
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_cache.db import (
dsn_from_config,
add_tag,
)
from cic_cache.runnable.daemons.filters import (
ERC20TransferFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
cic_base.config.log(config)
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def register_filter_tags(filters, session):
for f in filters:
tag = f.tag()
try:
add_tag(session, tag[0], domain=tag[1])
session.commit()
logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1]))
except sqlalchemy.exc.IntegrityError:
logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1]))
def main():
# Connect to blockchain with chainlib
rpc = RPCConnection.connect(chain_spec, 'default')
o = block_latest()
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('starting at block {}'.format(block_offset))
syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
erc20_transfer_filter = ERC20TransferFilter(chain_spec)
filters = [
erc20_transfer_filter,
]
session = SessionBase.create_session()
register_filter_tags(filters, session)
session.close()
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
for f in filters:
syncer.add_filter(f)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,339 @@
# standard imports
import sys
import os
import argparse
import logging
import time
import enum
import re
# third-party imports
import confini
from cic_registry import CICRegistry
from cic_registry.chain import (
ChainRegistry,
ChainSpec,
)
#from cic_registry.bancor import BancorRegistryClient
from cic_registry.token import Token
from cic_registry.error import (
UnknownContractError,
UnknownDeclarationError,
)
from cic_registry.declaration import to_token_declaration
from web3.exceptions import BlockNotFound, TransactionNotFound
from websockets.exceptions import ConnectionClosedError
from requests.exceptions import ConnectionError
import web3
from web3 import HTTPProvider, WebsocketProvider
# local imports
from cic_cache import db
from cic_cache.db.models.base import SessionBase
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
log_topics = {
'transfer': '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
'convert': '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095',
'accountregistry_add': '0a3b0a4f4c6e53dce3dbcad5614cb2ba3a0fa7326d03c5d64b4fa2d565492737',
}
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--trust-address', default=[], type=str, dest='trust_address', action='append', help='Set address as trust')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_TRUST_ADDRESS': ",".join(getattr(args, 'trust_address', [])),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
# connect to database
dsn = db.dsn_from_config(config)
SessionBase.connect(dsn)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
class RunStateEnum(enum.IntEnum):
INIT = 0
RUN = 1
TERMINATE = 9
def rubberstamp(src):
return True
class Tracker:
def __init__(self, chain_spec, trusts=[]):
self.block_height = 0
self.tx_height = 0
self.state = RunStateEnum.INIT
self.declarator_cache = {}
self.convert_enabled = False
self.trusts = trusts
self.chain_spec = chain_spec
self.declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
def __process_tx(self, w3, session, t, r, l, b):
token_value = int(l.data, 16)
token_sender = l.topics[1][-20:].hex()
token_recipient = l.topics[2][-20:].hex()
#ts = ContractRegistry.get_address(t.address)
ts = CICRegistry.get_address(self.chain_spec, t.address())
logg.info('add token transfer {} value {} from {} to {}'.format(
ts.symbol(),
token_value,
token_sender,
token_recipient,
)
)
db.add_transaction(
session,
r.transactionHash.hex(),
r.blockNumber,
r.transactionIndex,
w3.toChecksumAddress(token_sender),
w3.toChecksumAddress(token_recipient),
t.address(),
t.address(),
token_value,
token_value,
r.status == 1,
b.timestamp,
)
session.flush()
# TODO: simplify/ split up and/or comment, function is too long
def __process_convert(self, w3, session, t, r, l, b):
logg.warning('conversions are deactivated')
return
# token_source = l.topics[2][-20:].hex()
# token_source = w3.toChecksumAddress(token_source)
# token_destination = l.topics[3][-20:].hex()
# token_destination = w3.toChecksumAddress(token_destination)
# data_noox = l.data[2:]
# d = data_noox[:64]
# token_from_value = int(d, 16)
# d = data_noox[64:128]
# token_to_value = int(d, 16)
# token_trader = '0x' + data_noox[192-40:]
#
# #ts = ContractRegistry.get_address(token_source)
# ts = CICRegistry.get_address(CICRegistry.bancor_chain_spec, t.address())
# #if ts == None:
# # ts = ContractRegistry.reserves[token_source]
# td = ContractRegistry.get_address(token_destination)
# #if td == None:
# # td = ContractRegistry.reserves[token_source]
# logg.info('add token convert {} -> {} value {} -> {} trader {}'.format(
# ts.symbol(),
# td.symbol(),
# token_from_value,
# token_to_value,
# token_trader,
# )
# )
#
# db.add_transaction(
# session,
# r.transactionHash.hex(),
# r.blockNumber,
# r.transactionIndex,
# w3.toChecksumAddress(token_trader),
# w3.toChecksumAddress(token_trader),
# token_source,
# token_destination,
# r.status == 1,
# b.timestamp,
# )
# session.flush()
def check_token(self, address):
t = None
try:
t = CICRegistry.get_address(CICRegistry.default_chain_spec, address)
return t
except UnknownContractError:
logg.debug('contract {} not in registry'.format(address))
# If nothing was returned, we look up the token in the declarator
for trust in self.trusts:
logg.debug('look up declaration for contract {} with trust {}'.format(address, trust))
fn = self.declarator.function('declaration')
# TODO: cache trust in LRUcache
declaration_array = fn(trust, address).call()
try:
declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp])
logg.debug('found declaration for token {} from trust address {}'.format(address, trust))
except UnknownDeclarationError:
continue
try:
c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address)
t = CICRegistry.add_token(self.chain_spec, c)
break
except ValueError:
logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address))
return t
# TODO use input data instead of logs
def process(self, w3, session, block):
#self.refresh_registry(w3)
tx_count = w3.eth.getBlockTransactionCount(block.hash)
b = w3.eth.getBlock(block.hash)
for i in range(self.tx_height, tx_count):
tx = w3.eth.getTransactionByBlock(block.hash, i)
if tx.to == None:
logg.debug('block {} tx {} is contract creation tx, skipping'.format(block.number, i))
continue
if len(w3.eth.getCode(tx.to)) == 0:
logg.debug('block {} tx {} not a contract tx, skipping'.format(block.number, i))
continue
t = self.check_token(tx.to)
if t != None and isinstance(t, Token):
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['transfer']:
self.__process_tx(w3, session, t, r, l, b)
# TODO: cache contracts in LRUcache
elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address:
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['convert']:
self.__process_convert(w3, session, t, r, l, b)
session.execute("UPDATE tx_sync SET tx = '{}'".format(tx.hash.hex()))
session.commit()
self.tx_height += 1
def __get_next_retry(self, backoff=False):
return 1
def loop(self):
logg.info('starting at block {} tx index {}'.format(self.block_height, self.tx_height))
self.state = RunStateEnum.RUN
while self.state == RunStateEnum.RUN:
(provider, w3) = web3_constructor()
session = SessionBase.create_session()
try:
block = w3.eth.getBlock(self.block_height)
self.process(w3, session, block)
self.block_height += 1
self.tx_height = 0
except BlockNotFound as e:
logg.debug('no block {} yet, zZzZ...'.format(self.block_height))
time.sleep(self.__get_next_retry())
except ConnectionClosedError as e:
logg.info('connection gone, retrying')
time.sleep(self.__get_next_retry(True))
except OSError as e:
logg.error('cannot connect {}'.format(e))
time.sleep(self.__get_next_retry(True))
except Exception as e:
session.close()
raise(e)
session.close()
def load(self, w3):
session = SessionBase.create_session()
r = session.execute('SELECT tx FROM tx_sync').first()
if r != None:
if r[0] == '0x{0:0{1}X}'.format(0, 64):
logg.debug('last tx was zero-address, starting from scratch')
return
t = w3.eth.getTransaction(r[0])
self.block_height = t.blockNumber
self.tx_height = t.transactionIndex+1
c = w3.eth.getBlockTransactionCount(t.blockHash.hex())
logg.debug('last tx processed {} index {} (max index {})'.format(t.blockNumber, t.transactionIndex, c-1))
if c == self.tx_height:
self.block_height += 1
self.tx_height = 0
session.close()
(provider, w3) = web3_constructor()
trust = config.get('CIC_TRUST_ADDRESS', "").split(",")
chain_spec = args.i
try:
w3.eth.chainId
except Exception as e:
logg.exception(e)
sys.stderr.write('cannot connect to evm node\n')
sys.exit(1)
def main():
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry)
t = Tracker(chain_spec, trust)
t.load(w3)
t.loop()
if __name__ == '__main__':
main()

View File

@@ -5,7 +5,7 @@ version = (
0,
2,
0,
'alpha.2',
'alpha.1',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
DEBUG=

View File

@@ -1,4 +0,0 @@
[cic]
chain_spec =
registry_address =
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
DEBUG=1

View File

@@ -1,2 +1,3 @@
[eth]
provider = ws://localhost:63546
chain_id = 8996

View File

@@ -1,2 +0,0 @@
[syncer]
loop_interval = 1

View File

@@ -1,2 +1,7 @@
[eth]
provider = ws://localhost:8545
#ttp_provider = http://localhost:8545
#provider = http://localhost:8545
gas_provider_address =
#chain_id =
abi_dir = /usr/local/share/cic/solidity/abi

View File

@@ -1,2 +0,0 @@
[syncer]
loop_interval = 5

View File

@@ -1,4 +1,2 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=1
DEBUG=

View File

@@ -17,7 +17,7 @@ RUN apt-get update && \
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a76
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a44
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
@@ -43,9 +43,10 @@ COPY cic-cache/config/ /usr/local/etc/cic-cache/
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server

View File

@@ -1,6 +0,0 @@
#!/bin/bash
set -e
>&2 echo executing database migration
python scripts/migrate.py -c /usr/local/etc/cic-cache --migrations-dir /usr/local/share/cic-cache/alembic -vv
set +e

View File

@@ -1,10 +0,0 @@
#!/bin/bash
. ./db.sh
if [ $? -ne "0" ]; then
>&2 echo db migrate fail
exit 1
fi
/usr/local/bin/cic-cache-trackerd $@

View File

@@ -1,12 +1,10 @@
cic-base~=0.1.2b8
alembic==1.4.2
confini~=0.3.6rc3
confini~=0.3.6b2
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.5a4
cic-registry~=0.5.3a4
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.2a4

View File

@@ -29,13 +29,11 @@ packages =
cic_cache.db
cic_cache.db.models
cic_cache.runnable
cic_cache.runnable.daemons
cic_cache.runnable.daemons.filters
scripts =
./scripts/migrate.py
[options.entry_points]
console_scripts =
cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main
cic-cache-serverd = cic_cache.runnable.daemons.server:main
cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main
cic-cache-trackerd = cic_cache.runnable.tracker:main
cic-cache-serverd = cic_cache.runnable.server:main
cic-cache-taskerd = cic_cache.runnable.tasker:main

View File

@@ -4,8 +4,3 @@ pytest-mock==3.3.1
pysqlite3==0.4.3
sqlparse==0.4.1
pytest-celery==0.0.0a1
eth_tester==0.5.0b3
py-evm==0.3.0a20
web3==5.12.2
cic-eth-registry~=0.5.5a3
cic-base[full]==0.1.2b8

View File

@@ -3,7 +3,7 @@ import os
import sys
import datetime
# external imports
# third-party imports
import pytest
# local imports
@@ -84,7 +84,3 @@ def txs(
session.commit()
return [
tx_hash_first,
tx_hash_second,
]

View File

@@ -1,3 +0,0 @@
from chainlib.eth.pytest import *
from cic_eth_registry.pytest.fixtures_tokens import *

View File

@@ -1,69 +0,0 @@
# standard imports
import os
import datetime
import logging
import json
# external imports
import pytest
from sqlalchemy import text
from chainlib.eth.tx import Tx
from chainlib.eth.block import Block
from chainlib.chain import ChainSpec
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter
logg = logging.getLogger()
def test_cache(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
r = fltr.filter(eth_rpc, block, tx, db_session=init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash

View File

@@ -2,7 +2,7 @@
import os
import logging
# external imports
# third-party imports
import pytest
import confini
@@ -13,7 +13,7 @@ logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, 'config/test')
config_dir = os.path.join(root_dir, '.config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))

View File

@@ -3,16 +3,13 @@ import os
import logging
import re
# external imports
# third-party imports
import pytest
import sqlparse
import alembic
from alembic.config import Config as AlembicConfig
# local imports
from cic_cache.db.models.base import SessionBase
from cic_cache.db import dsn_from_config
from cic_cache.db import add_tag
logg = logging.getLogger(__file__)
@@ -29,10 +26,11 @@ def database_engine(
except FileNotFoundError:
pass
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn)
return dsn
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
@pytest.fixture(scope='function')
def init_database(
load_config,
@@ -40,23 +38,52 @@ def init_database(
):
rootdir = os.path.dirname(os.path.dirname(__file__))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrationsdir):
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
if load_config.get('DATABASE_ENGINE') == 'sqlite':
rconn = SessionBase.engine.raw_connection()
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
rconn.executescript(s)
else:
rconn = SessionBase.engine.raw_connection()
rcursor = rconn.cursor()
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
r = re.compile(r'^[A-Z]', re.MULTILINE)
for l in sqlparse.parse(s):
strl = str(l)
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
if not re.search(r, strl):
logg.warning('skipping parsed query line {}'.format(strl))
continue
rcursor.execute(strl)
rconn.commit()
rcursor.execute('SET search_path TO public')
# this doesn't work when run separately, no idea why
# functions have been manually added to original schema from cic-eth
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
#
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
rcursor.close()
session = SessionBase.create_session()
ac = AlembicConfig(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', database_engine)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
session.commit()
yield session
session.commit()
session.close()
@@ -89,14 +116,3 @@ def list_defaults(
return {
'block': 420000,
}
@pytest.fixture(scope='function')
def tags(
init_database,
):
add_tag(init_database, 'foo')
add_tag(init_database, 'baz', domain='bar')
add_tag(init_database, 'xyzzy', domain='bar')
init_database.commit()

View File

@@ -4,7 +4,7 @@ import datetime
import logging
import json
# external imports
# third-party imports
import pytest
# local imports

View File

@@ -1,37 +0,0 @@
import os
import datetime
import logging
import json
# external imports
import pytest
# local imports
from cic_cache.db import tag_transaction
logg = logging.getLogger()
def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tags,
):
tag_transaction(init_database, txs[0], 'foo')
tag_transaction(init_database, txs[0], 'baz', domain='bar')
tag_transaction(init_database, txs[1], 'xyzzy', domain='bar')
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.value = 'foo'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'baz'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'xyzzy'").fetchall()
assert r[0][0] == txs[1]

View File

@@ -2,7 +2,7 @@
import datetime
import logging
# external imports
# third-party imports
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
@@ -32,9 +32,7 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.AL
:returns: New lock state for address
:rtype: number
"""
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash)
logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@@ -53,9 +51,7 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.
:returns: New lock state for address
:rtype: number
"""
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, flags, address=address)
logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@@ -131,9 +127,7 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS, session=session)
if address != None:
@@ -145,9 +139,3 @@ def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
session.flush()
session.close()
return chained_input
@celery_app.task()
def shutdown(message):
logg.critical('shutdown called: {}'.format(message))
celery_app.control.shutdown() #broadcast('shutdown')

View File

@@ -5,13 +5,11 @@ import logging
import celery
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from chainqueue.query import get_tx
from chainqueue.state import set_cancel
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.admin.ctrl import (
lock_send,
@@ -19,8 +17,14 @@ from cic_eth.admin.ctrl import (
lock_queue,
unlock_queue,
)
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.queue.tx import (
get_tx,
set_cancel,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.gas import (
create_check_gas_task,
)
celery_app = celery.current_app
logg = logging.getLogger()
@@ -46,8 +50,8 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_brief = get_tx(tx_hash_orig_hex)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx'][2:]))
tx = unpack(tx_raw, chain_spec)
tx_raw = bytes.fromhex(tx_brief['signed_tx'][2:])
tx = unpack(tx_raw, chain_spec.chain_id())
nonce = tx_brief['nonce']
address = tx['from']
@@ -67,8 +71,8 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
tx_hashes = []
txs = []
for otx in otxs:
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
tx_raw = bytes.fromhex(otx.signed_tx[2:])
tx_new = unpack(tx_raw, chain_spec.chain_id())
tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce']

View File

@@ -1,19 +0,0 @@
# standard imports
import logging
# external imports
import celery
# local imports
from cic_eth.task import BaseTask
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(bind=True, base=BaseTask)
def default_token(self):
return {
'symbol': self.default_token_symbol,
'address': self.default_token_address,
}

View File

@@ -22,29 +22,26 @@ from hexathon import (
add_0x,
)
from chainlib.eth.gas import balance
from chainqueue.db.enum import (
StatusEnum,
StatusBits,
is_alive,
is_error_status,
status_str,
)
from chainqueue.error import TxStateChangeError
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.enum import (
StatusEnum,
is_alive,
)
from cic_eth.error import InitializationError
from cic_eth.queue.query import get_tx
from cic_eth.db.error import TxStateChangeError
from cic_eth.queue.tx import get_tx
app = celery.current_app
#logg = logging.getLogger(__file__)
logg = logging.getLogger()
local_fail = StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.UNKNOWN_ERROR
class AdminApi:
"""Provides an interface to view and manipulate existing transaction tasks and system runtime settings.
@@ -60,29 +57,6 @@ class AdminApi:
self.call_address = call_address
def proxy_do(self, chain_spec, o):
s_proxy = celery.signature(
'cic_eth.task.rpc_proxy',
[
chain_spec.asdict(),
o,
'default',
],
queue=self.queue
)
return s_proxy.apply_async()
def registry(self):
s_registry = celery.signature(
'cic_eth.task.registry',
[],
queue=self.queue
)
return s_registry.apply_async()
def unlock(self, chain_spec, address, flags=None):
s_unlock = celery.signature(
'cic_eth.admin.ctrl.unlock',
@@ -113,7 +87,7 @@ class AdminApi:
def get_lock(self):
s_lock = celery.signature(
'cic_eth.queue.lock.get_lock',
'cic_eth.queue.tx.get_lock',
[],
queue=self.queue,
)
@@ -155,13 +129,11 @@ class AdminApi:
return s_have.apply_async()
def resend(self, tx_hash_hex, chain_spec, in_place=True, unlock=False):
def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False):
logg.debug('resend {}'.format(tx_hash_hex))
s_get_tx_cache = celery.signature(
'cic_eth.queue.query.get_tx_cache',
'cic_eth.queue.tx.get_tx_cache',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue,
@@ -169,6 +141,7 @@ class AdminApi:
# TODO: This check should most likely be in resend task itself
tx_dict = s_get_tx_cache.apply_async().get()
#if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
if not is_alive(getattr(StatusEnum, tx_dict['status']).value):
raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex))
@@ -176,9 +149,9 @@ class AdminApi:
raise NotImplementedError('resend as new not yet implemented')
s = celery.signature(
'cic_eth.eth.gas.resend_with_higher_gas',
'cic_eth.eth.tx.resend_with_higher_gas',
[
chain_spec.asdict(),
chain_str,
None,
1.01,
],
@@ -186,7 +159,7 @@ class AdminApi:
)
s_manual = celery.signature(
'cic_eth.queue.state.set_manual',
'cic_eth.queue.tx.set_manual',
[
tx_hash_hex,
],
@@ -198,7 +171,7 @@ class AdminApi:
s_gas = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
chain_spec.asdict(),
chain_str,
tx_dict['sender'],
],
queue=self.queue,
@@ -209,9 +182,8 @@ class AdminApi:
def check_nonce(self, address):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
'cic_eth.queue.tx.get_account_tx',
[
chain_spec.asdict(),
address,
True,
False,
@@ -223,34 +195,28 @@ class AdminApi:
blocking_tx = None
blocking_nonce = None
nonce_otx = 0
last_nonce = -1
for k in txs.keys():
s_get_tx = celery.signature(
'cic_eth.queue.query.get_tx',
'cic_eth.queue.tx.get_tx',
[
chain_spec.asdict(),
k,
],
queue=self.queue,
)
tx = s_get_tx.apply_async().get()
#tx = get_tx(k)
logg.debug('checking nonce {} (previous {})'.format(tx['nonce'], last_nonce))
logg.debug('checking nonce {}'.format(tx['nonce']))
if tx['status'] in [StatusEnum.REJECTED, StatusEnum.FUBAR]:
blocking_tx = k
blocking_nonce = tx['nonce']
nonce_otx = tx['nonce']
if not is_alive(tx['status']) and tx['status'] & local_fail > 0:
logg.info('permanently errored {} nonce {} status {}'.format(k, nonce_otx, status_str(tx['status'])))
blocking_tx = k
blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1:
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from']))
blocking_tx = k
blocking_nonce = nonce_otx
break
last_nonce = nonce_otx
nonce_cache = Nonce.get(address)
#nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
return {
'nonce': {
#'network': nonce_cache,
'network': nonce_cache,
'queue': nonce_otx,
#'cache': nonce_cache,
'blocking': blocking_nonce,
@@ -263,9 +229,8 @@ class AdminApi:
def fix_nonce(self, address, nonce, chain_spec):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
'cic_eth.queue.tx.get_account_tx',
[
chain_spec.asdict(),
address,
True,
False,
@@ -291,19 +256,33 @@ class AdminApi:
return s_nonce.apply_async()
def account(self, chain_spec, address, include_sender=True, include_recipient=True, renderer=None, w=sys.stdout):
# # TODO: this is a stub, complete all checks
# def ready(self):
# """Checks whether all required initializations have been performed.
#
# :raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met.
# :raises KeyError: An address provided for initialization is not known by the keystore.
# """
# addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
# if addr == ZERO_ADDRESS:
# raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS')
#
# self.w3.eth.sign(addr, text='666f6f')
def account(self, chain_spec, address, cols=['tx_hash', 'sender', 'recipient', 'nonce', 'block', 'tx_index', 'status', 'network_status', 'date_created'], include_sender=True, include_recipient=True):
"""Lists locally originated transactions for the given Ethereum address.
Performs a synchronous call to the Celery task responsible for performing the query.
:param address: Ethereum address to return transactions for
:type address: str, 0x-hex
:param cols: Data columns to include
:type cols: list of str
"""
last_nonce = -1
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
'cic_eth.queue.tx.get_account_tx',
[
chain_spec.asdict(),
address,
],
queue=self.queue,
@@ -312,49 +291,33 @@ class AdminApi:
tx_dict_list = []
for tx_hash in txs.keys():
errors = []
s = celery.signature(
'cic_eth.queue.query.get_tx_cache',
[
chain_spec.asdict(),
tx_hash,
],
'cic_eth.queue.tx.get_tx_cache',
[tx_hash],
queue=self.queue,
)
tx_dict = s.apply_async().get()
if tx_dict['sender'] == address:
if tx_dict['nonce'] - last_nonce > 1:
logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash))
errors.append('nonce')
elif tx_dict['nonce'] == last_nonce:
logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash))
last_nonce = tx_dict['nonce']
if not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
continue
if tx_dict['sender'] == address and not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
continue
elif tx_dict['recipient'] == address and not include_recipient:
logg.debug('skipping recipient tx {}'.format(tx_dict['tx_hash']))
continue
logg.debug(tx_dict)
o = {
'nonce': tx_dict['nonce'],
'tx_hash': tx_dict['tx_hash'],
'status': tx_dict['status'],
'date_updated': tx_dict['date_updated'],
'errors': errors,
}
if renderer != None:
r = renderer(o)
w.write(r + '\n')
else:
tx_dict_list.append(o)
tx_dict_list.append(o)
return tx_dict_list
# TODO: Add exception upon non-existent tx aswell as invalid tx data to docstring
# TODO: This method is WAY too long
def tx(self, chain_spec, tx_hash=None, tx_raw=None, registry=None, renderer=None, w=sys.stdout):
def tx(self, chain_spec, tx_hash=None, tx_raw=None, registry=None):
"""Output local and network details about a given transaction with local origin.
If the transaction hash is given, the raw trasnaction data will be retrieved from the local transaction queue backend. Otherwise the raw transaction data must be provided directly. Only one of transaction hash and transaction data can be passed.
@@ -376,93 +339,50 @@ class AdminApi:
if tx_raw != None:
tx_hash = add_0x(keccak256_hex_to_hex(tx_raw))
#tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
s = celery.signature(
'cic_eth.queue.query.get_tx_cache',
[
chain_spec.asdict(),
tx_hash,
],
'cic_eth.queue.tx.get_tx_cache',
[tx_hash],
queue=self.queue,
)
t = s.apply_async()
tx = t.get()
tx = s.apply_async().get()
source_token = None
if tx['source_token'] != ZERO_ADDRESS:
if registry != None:
try:
source_token = registry.by_address(tx['source_token'])
except UnknownContractError:
logg.warning('unknown source token contract {} (direct)'.format(tx['source_token']))
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['source_token'],
],
queue=self.queue
)
t = s.apply_async()
source_token = t.get()
if source_token == None:
logg.warning('unknown source token contract {} (task pool)'.format(tx['source_token']))
try:
source_token = registry.by_address(tx['source_token'])
#source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
except UnknownContractError:
#source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#source_token = CICRegistry.add_token(chain_spec, source_token_contract)
logg.warning('unknown source token contract {}'.format(tx['source_token']))
destination_token = None
if tx['destination_token'] != ZERO_ADDRESS:
if registry != None:
try:
destination_token = registry.by_address(tx['destination_token'])
except UnknownContractError:
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['destination_token'],
],
queue=self.queue
)
t = s.apply_async()
destination_token = t.get()
if destination_token == None:
logg.warning('unknown destination token contract {} (task pool)'.format(tx['destination_token']))
if tx['source_token'] != ZERO_ADDRESS:
try:
#destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
destination_token = registry.by_address(tx['destination_token'])
except UnknownContractError:
#destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
tx['sender_description'] = 'Custodial account'
tx['recipient_description'] = 'Custodial account'
o = code(tx['sender'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
if registry != None:
try:
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract at {}'.format(tx['sender'])
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
tx['sender_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['sender'],
],
queue=self.queue
)
t = s.apply_async()
tx['sender_description'] = t.get()
if tx['sender_description'] == None:
tx['sender_description'] = 'Unknown contract'
try:
#sender_contract = CICRegistry.get_address(chain_spec, tx['sender'])
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract at {}'.format(tx['sender']) #sender_contract)
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
tx['sender_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.eth.account.have',
@@ -491,31 +411,16 @@ class AdminApi:
tx['sender_description'] = role
o = code(tx['recipient'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
if registry != None:
try:
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient'])
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:
tx['recipient_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['recipient'],
],
queue=self.queue
)
t = s.apply_async()
tx['recipient_description'] = t.get()
if tx['recipient_description'] == None:
tx['recipient_description'] = 'Unknown contract'
try:
#recipient_contract = CICRegistry.by_address(tx['recipient'])
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) #recipient_contract)
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:
tx['recipient_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.eth.account.have',
@@ -551,24 +456,20 @@ class AdminApi:
tx['destination_token_symbol'] = destination_token.symbol()
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
# TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
tx['network_status'] = 'Not in node'
tx['network_status'] = 'Not submitted'
r = None
try:
o = transaction(tx_hash)
t = self.proxy_do(chain_spec, o)
r = t.get()
if r != None:
tx['network_status'] = 'Mempool'
r = self.rpc.do(o)
except Exception as e:
logg.warning('(too permissive exception handler, please fix!) {}'.format(e))
tx['network_status'] = 'Mempool'
if r != None:
try:
o = receipt(tx_hash)
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
logg.debug('h {} o {}'.format(tx_hash, o))
if int(strip_0x(r['status'])) == 1:
tx['network_status'] = 'Confirmed'
@@ -583,24 +484,21 @@ class AdminApi:
pass
o = balance(tx['sender'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
tx['sender_gas_balance'] = r
o = balance(tx['recipient'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
tx['recipient_gas_balance'] = r
tx_unpacked = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec)
tx_unpacked = unpack(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
tx['gas_price'] = tx_unpacked['gasPrice']
tx['gas_limit'] = tx_unpacked['gas']
tx['data'] = tx_unpacked['data']
s = celery.signature(
'cic_eth.queue.state.get_state_log',
'cic_eth.queue.tx.get_state_log',
[
chain_spec.asdict(),
tx_hash,
],
queue=self.queue,
@@ -613,9 +511,4 @@ class AdminApi:
for p in problems:
sys.stderr.write('!!!{}\n'.format(p))
if renderer == None:
return tx
r = renderer(tx)
w.write(r + '\n')
return None
return tx

View File

@@ -37,7 +37,7 @@ class Api:
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
logg.info('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
@@ -62,18 +62,6 @@ class Api:
)
def default_token(self):
s_token = celery.signature(
'cic_eth.admin.token.default_token',
[],
queue=self.queue,
)
if self.callback_param != None:
s_token.link(self.callback_success)
return s_token.apply_async()
def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
@@ -104,10 +92,8 @@ class Api:
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
@@ -168,10 +154,8 @@ class Api:
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
@@ -229,9 +213,8 @@ class Api:
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
'cic_eth.eth.tx.reserve_nonce',
[
self.chain_spec.asdict(),
from_address,
],
queue=self.queue,
@@ -376,9 +359,8 @@ class Api:
if register:
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
'cic_eth.eth.tx.reserve_nonce',
[
self.chain_spec.asdict(),
'ACCOUNT_REGISTRY_WRITER',
],
queue=self.queue,
@@ -415,15 +397,14 @@ class Api:
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
'cic_eth.eth.tx.reserve_nonce',
[
self.chain_spec.asdict(),
'GAS_GIFTER',
],
queue=self.queue,
)
s_refill = celery.signature(
'cic_eth.eth.gas.refill_gas',
'cic_eth.eth.tx.refill_gas',
[
self.chain_spec.asdict(),
],
@@ -458,9 +439,8 @@ class Api:
"""
offset = 0
s_local = celery.signature(
'cic_eth.queue.query.get_account_tx',
'cic_eth.queue.tx.get_account_tx',
[
self.chain_spec.asdict(),
address,
],
queue=self.queue,

View File

@@ -1,8 +0,0 @@
from cic_eth.db.models.base import SessionBase
def health(*args, **kwargs):
session = SessionBase.create_session()
session.execute('SELECT count(*) from alembic_version')
session.close()
return True

View File

@@ -1,48 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import RPCConnection
from chainlib.chain import ChainSpec
from chainlib.eth.gas import balance
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
from cic_eth.db.enum import LockEnum
from cic_eth.error import LockedError
from cic_eth.admin.ctrl import check_lock
logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
session = SessionBase.create_session()
config = kwargs['config']
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec))
try:
check_lock(None, None, LockEnum.INIT)
except LockedError:
logg.warning('INIT lock is set, skipping GAS GIFTER balance check.')
return True
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
rpc = RPCConnection.connect(chain_spec, 'default')
o = balance(gas_provider)
r = rpc.do(o)
try:
r = int(r, 16)
except TypeError:
r = int(r)
gas_min = int(config.get('ETH_GAS_GIFTER_MINIMUM_BALANCE'))
if r < gas_min:
logg.error('EEK! gas gifter has balance {}, below minimum {}'.format(r, gas_min))
return False
return True

View File

@@ -1,18 +0,0 @@
# external imports
import redis
import os
def health(*args, **kwargs):
r = redis.Redis(
host=kwargs['config'].get('REDIS_HOST'),
port=kwargs['config'].get('REDIS_PORT'),
db=kwargs['config'].get('REDIS_DB'),
)
try:
r.set(kwargs['unit'], os.getpid())
except redis.connection.ConnectionError:
return False
except redis.connection.ResponseError:
return False
return True

View File

@@ -1,37 +0,0 @@
# standard imports
import time
import logging
from urllib.error import URLError
# external imports
from chainlib.connection import RPCConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.sign import sign_message
from chainlib.error import JSONRPCException
logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
blocked = True
max_attempts = 5
conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer')
for i in range(max_attempts):
idx = i + 1
logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts))
try:
conn.do(sign_message(ZERO_ADDRESS, '0x2a'))
except FileNotFoundError:
pass
except ConnectionError:
pass
except URLError:
pass
except JSONRPCException:
logg.debug('signer connection succeeded')
return True
if idx < max_attempts:
time.sleep(0.5)
return False

View File

@@ -11,6 +11,10 @@ logg = logging.getLogger()
# an Engine, which the Session will use for connection
# resources
# TODO: Remove the package exports, all models should be imported using full path
from .models.otx import Otx
from .models.convert import TxConvertTransfer
def dsn_from_config(config):
"""Generate a dsn string from the provided config dict.

View File

@@ -74,11 +74,10 @@ class LockEnum(enum.IntEnum):
QUEUE: Disable queueing new or modified transactions
"""
STICKY=1
INIT=2
CREATE=4
SEND=8
QUEUE=16
QUERY=32
CREATE=2
SEND=4
QUEUE=8
QUERY=16
ALL=int(0xfffffffffffffffe)

View File

@@ -1,29 +0,0 @@
"""Add chainqueue
Revision ID: 0ec0d6d1e785
Revises:
Create Date: 2021-04-02 18:30:55.398388
"""
from alembic import op
import sqlalchemy as sa
from chainqueue.db.migrations.sqlalchemy import (
chainqueue_upgrade,
chainqueue_downgrade,
)
# revision identifiers, used by Alembic.
revision = '0ec0d6d1e785'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
chainqueue_upgrade(0, 0, 1)
def downgrade():
chainqueue_downgrade(0, 0, 1)

View File

@@ -1,29 +0,0 @@
"""Roles
Revision ID: 1f1b3b641d08
Revises: 9c420530eeb2
Create Date: 2021-04-02 18:40:27.787631
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1f1b3b641d08'
down_revision = '9c420530eeb2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
def downgrade():
op.drop_table('account_role')

View File

@@ -0,0 +1,35 @@
"""Add new syncer table
Revision ID: 2a07b543335e
Revises: a2e2aab8f331
Create Date: 2020-12-27 09:35:44.017981
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a07b543335e'
down_revision = 'a2e2aab8f331'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'blockchain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
def downgrade():
op.drop_table('blockchain_sync')

View File

@@ -1,8 +1,8 @@
"""Nonce
"""Nonce reservation
Revision ID: 9c420530eeb2
Revises: b125cbf81e32
Create Date: 2021-04-02 18:38:56.459334
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
@@ -10,22 +10,15 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c420530eeb2'
down_revision = 'b125cbf81e32'
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
op.create_table(
'nonce_task_reservation',
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
@@ -36,4 +29,3 @@ def upgrade():
def downgrade():
op.drop_table('nonce_task_reservation')
op.drop_table('nonce')

View File

@@ -0,0 +1,29 @@
"""Add nonce index
Revision ID: 49b348246d70
Revises: 52c7c59cd0b1
Create Date: 2020-12-19 09:45:36.186446
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49b348246d70'
down_revision = '52c7c59cd0b1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('nonce')

View File

@@ -0,0 +1,31 @@
"""Add account roles
Revision ID: 52c7c59cd0b1
Revises: 9c4bd7491015
Create Date: 2020-12-19 07:21:38.249237
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '52c7c59cd0b1'
down_revision = '9c4bd7491015'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
pass
def downgrade():
op.drop_table('account_role')
pass

View File

@@ -0,0 +1,30 @@
"""Add otx state log
Revision ID: 6ac7a1dadc46
Revises: 89e1e9baa53c
Create Date: 2021-01-30 13:59:49.022373
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ac7a1dadc46'
down_revision = '89e1e9baa53c'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_state_log',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
sa.Column('status', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('otx_state_log')

View File

@@ -0,0 +1,31 @@
"""Add attempts and version log for otx
Revision ID: 71708e943dbd
Revises: 7e8d7626e38f
Create Date: 2020-09-26 14:41:19.298651
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '71708e943dbd'
down_revision = '7e8d7626e38f'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_attempts',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('otx_attempts')
pass

View File

@@ -0,0 +1,31 @@
"""add blocknumber pointer
Revision ID: 7cb65b893934
Revises: 8593fa1ca0f4
Create Date: 2020-09-24 19:29:13.543648
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7cb65b893934'
down_revision = '8593fa1ca0f4'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass
def downgrade():
op.drop_table('watcher_state')
pass

View File

@@ -0,0 +1,45 @@
"""Add block sync
Revision ID: 7e8d7626e38f
Revises: cd2052be6db2
Create Date: 2020-09-26 11:12:27.818524
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7e8d7626e38f'
down_revision = 'cd2052be6db2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'block_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False, unique=True),
sa.Column('block_height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('block_height_session', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_session', sa.Integer, nullable=False, default=0),
sa.Column('block_height_head', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_head', sa.Integer, nullable=False, default=0),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.drop_table('watcher_state')
pass
def downgrade():
op.drop_table('block_sync')
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass

View File

@@ -0,0 +1,35 @@
"""Add transaction queue
Revision ID: 8593fa1ca0f4
Revises:
Create Date: 2020-09-22 21:56:42.117047
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '8593fa1ca0f4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('signed_tx', sa.Text, nullable=False),
sa.Column('status', sa.Integer, nullable=False, default=-9),
sa.Column('block', sa.Integer),
)
op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True)
def downgrade():
op.drop_index('idx_otx_tx')
op.drop_table('otx')

View File

@@ -1,20 +1,17 @@
"""Lock
"""Add account lock
Revision ID: 75d4767b3031
Revises: 1f1b3b641d08
Create Date: 2021-04-02 18:41:20.864265
Revision ID: 89e1e9baa53c
Revises: 2a07b543335e
Create Date: 2021-01-27 19:57:36.793882
"""
import datetime
from alembic import op
import sqlalchemy as sa
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth.db.enum import LockEnum
# revision identifiers, used by Alembic.
revision = '75d4767b3031'
down_revision = '1f1b3b641d08'
revision = '89e1e9baa53c'
down_revision = '2a07b543335e'
branch_labels = None
depends_on = None
@@ -26,12 +23,10 @@ def upgrade():
sa.Column("address", sa.String(42), nullable=True),
sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False, default=datetime.datetime.utcnow),
sa.Column("otx_id", sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column("date_created", sa.DateTime, nullable=False),
sa.Column("otx_id", sa.Integer, nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
def downgrade():
op.drop_index('idx_chain_address')

View File

@@ -0,0 +1,26 @@
"""Rename block sync table
Revision ID: 9c4bd7491015
Revises: 9daa16518a91
Create Date: 2020-10-15 23:45:56.306898
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c4bd7491015'
down_revision = '9daa16518a91'
branch_labels = None
depends_on = None
def upgrade():
op.rename_table('block_sync', 'otx_sync')
pass
def downgrade():
op.rename_table('otx_sync', 'block_sync')
pass

View File

@@ -0,0 +1,30 @@
"""add tx sync state
Revision ID: 9daa16518a91
Revises: e3b5330ee71c
Create Date: 2020-10-10 14:43:18.699276
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9daa16518a91'
down_revision = 'e3b5330ee71c'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx_sync',
# sa.Column('tx', sa.String(66), nullable=False),
# )
# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')")
pass
def downgrade():
# op.drop_table('tx_sync')
pass

View File

@@ -0,0 +1,34 @@
"""Add date accessed to txcache
Revision ID: a2e2aab8f331
Revises: 49b348246d70
Create Date: 2020-12-24 18:58:06.137812
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a2e2aab8f331'
down_revision = '49b348246d70'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'tx_cache',
sa.Column(
'date_checked',
sa.DateTime,
nullable=False
)
)
pass
def downgrade():
# drop does not work withs qlite
#op.drop_column('tx_cache', 'date_checked')
pass

View File

@@ -1,8 +1,8 @@
"""Convert
"""convert tx index
Revision ID: aee12aeb47ec
Revises: 5ca4b77ce205
Create Date: 2021-04-02 18:42:45.233356
Revision ID: cd2052be6db2
Revises: 7cb65b893934
Create Date: 2020-09-24 21:20:51.580500
"""
from alembic import op
@@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'aee12aeb47ec'
down_revision = '5ca4b77ce205'
revision = 'cd2052be6db2'
down_revision = '7cb65b893934'
branch_labels = None
depends_on = None
@@ -20,8 +20,10 @@ def upgrade():
op.create_table(
'tx_convert_transfer',
sa.Column('id', sa.Integer, primary_key=True),
#sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('transfer_tx_hash', sa.String(66), unique=True),
# sa.Column('holder_address', sa.String(42), nullable=False),
sa.Column('recipient_address', sa.String(42), nullable=False),
)
op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address'])

View File

@@ -0,0 +1,31 @@
"""Add tx tracker record
Revision ID: df19f4e69676
Revises: 71708e943dbd
Create Date: 2020-10-09 23:31:44.563498
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'df19f4e69676'
down_revision = '71708e943dbd'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx',
# sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('date_added', sa.DateTime, nullable=False),
# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True),
# sa.Column('success', sa.Boolean(), nullable=False),
# )
pass
def downgrade():
# op.drop_table('tx')
pass

View File

@@ -0,0 +1,38 @@
"""Add cached values for tx
Revision ID: e3b5330ee71c
Revises: df19f4e69676
Create Date: 2020-10-10 00:17:07.094893
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e3b5330ee71c'
down_revision = 'df19f4e69676'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
sa.Column('source_token_address', sa.String(42), nullable=False),
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)
def downgrade():
op.drop_table('tx_cache')
pass

View File

@@ -1,8 +1,8 @@
"""Add chain syncer
Revision ID: 6604de4203e2
Revises: 63b629f14a85
Create Date: 2021-04-01 08:10:29.156243
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
@@ -14,15 +14,15 @@ from chainsyncer.db.migrations.sqlalchemy import (
# revision identifiers, used by Alembic.
revision = '6604de4203e2'
down_revision = '63b629f14a85'
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -1,8 +1,8 @@
"""DEbug
"""debug output
Revision ID: 5ca4b77ce205
Revises: 75d4767b3031
Create Date: 2021-04-02 18:42:12.257244
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
@@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '5ca4b77ce205'
down_revision = '75d4767b3031'
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
@@ -24,7 +24,9 @@ def upgrade():
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = .
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
#sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-eth
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1,77 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,35 @@
"""Add new syncer table
Revision ID: 2a07b543335e
Revises: a2e2aab8f331
Create Date: 2020-12-27 09:35:44.017981
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a07b543335e'
down_revision = 'a2e2aab8f331'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'blockchain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
def downgrade():
op.drop_table('blockchain_sync')

View File

@@ -0,0 +1,31 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -0,0 +1,29 @@
"""Add nonce index
Revision ID: 49b348246d70
Revises: 52c7c59cd0b1
Create Date: 2020-12-19 09:45:36.186446
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49b348246d70'
down_revision = '52c7c59cd0b1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('nonce')

View File

@@ -0,0 +1,31 @@
"""Add account roles
Revision ID: 52c7c59cd0b1
Revises: 9c4bd7491015
Create Date: 2020-12-19 07:21:38.249237
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '52c7c59cd0b1'
down_revision = '9c4bd7491015'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
pass
def downgrade():
op.drop_table('account_role')
pass

View File

@@ -0,0 +1,30 @@
"""Add otx state log
Revision ID: 6ac7a1dadc46
Revises: 89e1e9baa53c
Create Date: 2021-01-30 13:59:49.022373
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ac7a1dadc46'
down_revision = '89e1e9baa53c'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_state_log',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
sa.Column('status', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('otx_state_log')

View File

@@ -0,0 +1,31 @@
"""Add attempts and version log for otx
Revision ID: 71708e943dbd
Revises: 7e8d7626e38f
Create Date: 2020-09-26 14:41:19.298651
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '71708e943dbd'
down_revision = '7e8d7626e38f'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_attempts',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('otx_attempts')
pass

View File

@@ -0,0 +1,31 @@
"""add blocknumber pointer
Revision ID: 7cb65b893934
Revises: 8593fa1ca0f4
Create Date: 2020-09-24 19:29:13.543648
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7cb65b893934'
down_revision = '8593fa1ca0f4'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass
def downgrade():
op.drop_table('watcher_state')
pass

View File

@@ -0,0 +1,42 @@
"""Add block sync
Revision ID: 7e8d7626e38f
Revises: cd2052be6db2
Create Date: 2020-09-26 11:12:27.818524
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7e8d7626e38f'
down_revision = 'cd2052be6db2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'block_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False, unique=True),
sa.Column('height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('height_session', sa.Integer, nullable=False, default=0),
sa.Column('height_head', sa.Integer, nullable=False, default=0),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.drop_table('watcher_state')
pass
def downgrade():
op.drop_table('block_sync')
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass

View File

@@ -0,0 +1,35 @@
"""Add transaction queue
Revision ID: 8593fa1ca0f4
Revises:
Create Date: 2020-09-22 21:56:42.117047
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '8593fa1ca0f4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('signed_tx', sa.Text, nullable=False),
sa.Column('status', sa.Integer, nullable=False, default=-9),
sa.Column('block', sa.Integer),
)
op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True)
def downgrade():
op.drop_index('idx_otx_tx')
op.drop_table('otx')

View File

@@ -0,0 +1,33 @@
"""Add account lock
Revision ID: 89e1e9baa53c
Revises: 2a07b543335e
Create Date: 2021-01-27 19:57:36.793882
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '89e1e9baa53c'
down_revision = '2a07b543335e'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'lock',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column("address", sa.String(42), nullable=True),
sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False),
sa.Column("otx_id", sa.Integer, nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
def downgrade():
op.drop_index('idx_chain_address')
op.drop_table('lock')

View File

@@ -0,0 +1,26 @@
"""Rename block sync table
Revision ID: 9c4bd7491015
Revises: 9daa16518a91
Create Date: 2020-10-15 23:45:56.306898
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c4bd7491015'
down_revision = '9daa16518a91'
branch_labels = None
depends_on = None
def upgrade():
op.rename_table('block_sync', 'otx_sync')
pass
def downgrade():
op.rename_table('otx_sync', 'block_sync')
pass

View File

@@ -0,0 +1,30 @@
"""add tx sync state
Revision ID: 9daa16518a91
Revises: e3b5330ee71c
Create Date: 2020-10-10 14:43:18.699276
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9daa16518a91'
down_revision = 'e3b5330ee71c'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx_sync',
# sa.Column('tx', sa.String(66), nullable=False),
# )
# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')")
pass
def downgrade():
# op.drop_table('tx_sync')
pass

View File

@@ -0,0 +1,33 @@
"""Add date accessed to txcache
Revision ID: a2e2aab8f331
Revises: 49b348246d70
Create Date: 2020-12-24 18:58:06.137812
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a2e2aab8f331'
down_revision = '49b348246d70'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'tx_cache',
sa.Column(
'date_checked',
sa.DateTime,
nullable=False
)
)
pass
def downgrade():
op.drop_column('tx_cache', 'date_checked')
pass

View File

@@ -0,0 +1,34 @@
"""convert tx index
Revision ID: cd2052be6db2
Revises: 7cb65b893934
Create Date: 2020-09-24 21:20:51.580500
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'cd2052be6db2'
down_revision = '7cb65b893934'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_convert_transfer',
sa.Column('id', sa.Integer, primary_key=True),
#sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('transfer_tx_hash', sa.String(66), unique=True),
# sa.Column('holder_address', sa.String(42), nullable=False),
sa.Column('recipient_address', sa.String(42), nullable=False),
)
op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address'])
def downgrade():
op.drop_index('idx_tx_convert_address')
op.drop_table('tx_convert_transfer')

View File

@@ -0,0 +1,31 @@
"""Add tx tracker record
Revision ID: df19f4e69676
Revises: 71708e943dbd
Create Date: 2020-10-09 23:31:44.563498
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'df19f4e69676'
down_revision = '71708e943dbd'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx',
# sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('date_added', sa.DateTime, nullable=False),
# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True),
# sa.Column('success', sa.Boolean(), nullable=False),
# )
pass
def downgrade():
# op.drop_table('tx')
pass

View File

@@ -0,0 +1,37 @@
"""Add cached values for tx
Revision ID: e3b5330ee71c
Revises: df19f4e69676
Create Date: 2020-10-10 00:17:07.094893
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e3b5330ee71c'
down_revision = 'df19f4e69676'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
sa.Column('source_token_address', sa.String(42), nullable=False),
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)
def downgrade():
op.drop_table('tx_cache')
pass

View File

@@ -1,13 +1,12 @@
"""Add chain syncer
Revision ID: b125cbf81e32
Revises: 0ec0d6d1e785
Create Date: 2021-04-02 18:36:44.459603
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
@@ -15,15 +14,15 @@ from chainsyncer.db.migrations.sqlalchemy import (
# revision identifiers, used by Alembic.
revision = 'b125cbf81e32'
down_revision = '0ec0d6d1e785'
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -10,7 +10,6 @@ from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger()
@@ -65,7 +64,6 @@ class SessionBase(Model):
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
@@ -76,22 +74,17 @@ class SessionBase(Model):
echo=debug,
)
else:
if pool_size == 0:
logg.info('db using nullpool')
poolclass = NullPool
elif debug:
logg.info('db using assertion pool')
if debug:
poolclass = AssertionPool
else:
logg.info('db using static pool')
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db not poolable')
e = create_engine(
dsn,
echo=debug,

View File

@@ -5,11 +5,11 @@ import logging
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey
from chainlib.eth.constant import ZERO_ADDRESS
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.otx import Otx
logg = logging.getLogger()
@@ -22,12 +22,10 @@ class Lock(SessionBase):
__tablename__ = "lock"
blockchain = Column(String)
#address = Column(String, ForeignKey('tx_cache.sender'))
address = Column(String, ForeignKey(TxCache.sender))
address = Column(String, ForeignKey('tx_cache.sender'))
flags = Column(Integer)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
otx_id = Column(Integer, ForeignKey(Otx.id))
#otx_id = Column(Integer)
otx_id = Column(Integer, ForeignKey('otx.id'))
def chain(self):

View File

@@ -0,0 +1,680 @@
# standard imports
import datetime
import logging
# external imports
from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
is_error_status,
)
from cic_eth.db.error import TxStateChangeError
logg = logging.getLogger()
class OtxStateLog(SessionBase):
__tablename__ = 'otx_state_log'
date = Column(DateTime, default=datetime.datetime.utcnow)
status = Column(Integer)
otx_id = Column(Integer, ForeignKey('otx.id'))
def __init__(self, otx):
self.otx_id = otx.id
self.status = otx.status
class Otx(SessionBase):
"""Outgoing transactions with local origin.
:param nonce: Transaction nonce
:type nonce: number
:param address: Ethereum address of recipient - NOT IN USE, REMOVE
:type address: str
:param tx_hash: Tranasction hash
:type tx_hash: str, 0x-hex
:param signed_tx: Signed raw transaction data
:type signed_tx: str, 0x-hex
"""
__tablename__ = 'otx'
tracing = False
"""Whether to enable queue state tracing"""
nonce = Column(Integer)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
tx_hash = Column(String(66))
signed_tx = Column(Text)
status = Column(Integer)
block = Column(Integer)
def __set_status(self, status, session):
self.status |= status
session.add(self)
session.flush()
def __reset_status(self, status, session):
status_edit = ~status & self.status
self.status &= status_edit
session.add(self)
session.flush()
def __status_already_set(self, status):
r = bool(self.status & status)
if r:
logg.warning('status bit {} already set on {}'.format(status.name, self.tx_hash))
return r
def __status_not_set(self, status):
r = not(self.status & status)
if r:
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None):
"""Set block number transaction was mined in.
Only manipulates object, does not transaction or commit to backend.
:param block: Block number
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.block != None:
SessionBase.release_session(session)
raise TxStateChangeError('Attempted set block {} when block was already {}'.format(block, self.block))
self.block = block
session.add(self)
session.flush()
SessionBase.release_session(session)
def waitforgas(self, session=None):
"""Marks transaction as suspended pending gas funding.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.GAS_ISSUES):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.GAS_ISSUES, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.DEFERRED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def fubar(self, session=None):
"""Marks transaction as "fubar." Any transaction marked this way is an anomaly and may be a symptom of a serious problem.
Only manipulates object, does not transaction or commit to backend.
"""
if self.__status_already_set(StatusBits.UNKNOWN_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
SessionBase.release_session(session)
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def reject(self, session=None):
"""Marks transaction as "rejected," which means the node rejected sending the transaction to the network. The nonce has not been spent, and the transaction should be replaced.
Only manipulates object, does not transaction or commit to backend.
"""
if self.__status_already_set(StatusBits.NODE_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if is_error_status(self.status):
SessionBase.release_session(session)
raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def override(self, manual=False, session=None):
"""Marks transaction as manually overridden.
Only manipulates object, does not transaction or commit to backend.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if self.status & StatusBits.OBSOLETE:
SessionBase.release_session(session)
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.OBSOLETE, session)
#if manual:
# self.__set_status(StatusBits.MANUAL, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.IN_NETWORK, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def manual(self, session=None):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.MANUAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def retry(self, session=None):
"""Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0:
SessionBase.release_session(session)
raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def readysend(self, session=None):
"""Marks transaction as ready for initial send attempt.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
SessionBase.release_session(session)
raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sent(self, session=None):
"""Marks transaction as having been sent to network.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.IN_NETWORK):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.IN_NETWORK, session)
self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sendfail(self, session=None):
"""Marks that an attempt to send the transaction to the network has failed.
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NODE_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def dequeue(self, session=None):
"""Marks that a process to execute send attempt is underway
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_not_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('QUEUED cannot be unset on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('QUEUED cannot be unset on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__reset_status(StatusBits.QUEUED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None):
"""Marks that transaction was mined but code execution did not succeed.
Only manipulates object, does not transaction or commit to backend.
:param block: Block number transaction was mined in.
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NETWORK_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if block != None:
self.block = block
self.__set_status(StatusBits.NETWORK_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def cancel(self, confirmed=False, session=None):
"""Marks that the transaction has been succeeded by a new transaction with same nonce.
If set to confirmed, the previous state must be OBSOLETED, and will transition to CANCELLED - a finalized state. Otherwise, the state must follow a non-finalized state, and will be set to OBSOLETED.
Only manipulates object, does not transaction or commit to backend.
:param confirmed: Whether transition is to a final state.
:type confirmed: bool
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed:
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
SessionBase.release_session(session)
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusEnum.CANCELLED, session)
else:
self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def success(self, block, session=None):
"""Marks that transaction was successfully mined.
Only manipulates object, does not transaction or commit to backend.
:param block: Block number transaction was mined in.
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
SessionBase.release_session(session)
raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
SessionBase.release_session(session)
raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
SessionBase.release_session(session)
raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status)))
if block != None:
self.block = block
self.__set_status(StatusEnum.SUCCESS, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
@staticmethod
def get(status=0, limit=4096, status_exact=True, session=None):
"""Returns outgoing transaction lists by status.
Status may either be matched exactly, or be an upper bound of the integer value of the status enum.
:param status: Status value to use in query
:type status: cic_eth.db.enum.StatusEnum
:param limit: Max results to return
:type limit: number
:param status_exact: Whether or not to perform exact status match
:type bool:
:returns: List of transaction hashes
:rtype: tuple, where first element is transaction hash
"""
e = None
session = SessionBase.bind_session(session)
if status_exact:
e = session.query(Otx.tx_hash).filter(Otx.status==status).order_by(Otx.date_created.asc()).limit(limit).all()
else:
e = session.query(Otx.tx_hash).filter(Otx.status<=status).order_by(Otx.date_created.asc()).limit(limit).all()
SessionBase.release_session(session)
return e
@staticmethod
def load(tx_hash, session=None):
"""Retrieves the outgoing transaction record by transaction hash.
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
"""
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
SessionBase.release_session(session)
return q.first()
@staticmethod
def account(account_address):
"""Retrieves all transaction hashes for which the given Ethereum address is sender or recipient.
:param account_address: Ethereum address to use in query.
:type account_address: str, 0x-hex
:returns: Outgoing transactions
:rtype: tuple, where first element is transaction hash
"""
session = Otx.create_session()
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(or_(TxCache.sender==account_address, TxCache.recipient==account_address))
txs = q.all()
session.close()
return list(txs)
def __state_log(self, session):
l = OtxStateLog(self)
session.add(l)
# TODO: it is not safe to return otx here unless session has been passed in
@staticmethod
def add(nonce, address, tx_hash, signed_tx, session=None):
external_session = session != None
session = SessionBase.bind_session(session)
otx = Otx(nonce, address, tx_hash, signed_tx)
session.add(otx)
session.flush()
if otx.tracing:
otx.__state_log(session=session)
session.flush()
SessionBase.release_session(session)
if not external_session:
return None
return otx
def __init__(self, nonce, address, tx_hash, signed_tx):
self.nonce = nonce
self.tx_hash = tx_hash
self.signed_tx = signed_tx
self.status = StatusEnum.PENDING
signed_tx_bytes = bytes.fromhex(signed_tx[2:])
# sender_address = address_hex_from_signed_tx(signed_tx_bytes)
# logg.debug('decoded tx {}'.format(sender_address))
# TODO: Most of the methods on this object are obsolete, but it contains a static function for retrieving "expired" outgoing transactions that should be moved to Otx instead.
class OtxSync(SessionBase):
"""Obsolete
"""
__tablename__ = 'otx_sync'
blockchain = Column(String)
block_height_backlog = Column(Integer)
tx_height_backlog = Column(Integer)
block_height_session = Column(Integer)
tx_height_session = Column(Integer)
block_height_head = Column(Integer)
tx_height_head = Column(Integer)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
date_updated = Column(DateTime)
def backlog(self, block_height=None, tx_height=None):
#session = OtxSync.create_session()
if block_height != None:
if tx_height == None:
raise ValueError('tx height missing')
self.block_height_backlog = block_height
self.tx_height_backlog = tx_height
#session.add(self)
self.date_updated = datetime.datetime.utcnow()
#session.commit()
block_height = self.block_height_backlog
tx_height = self.tx_height_backlog
#session.close()
return (block_height, tx_height)
def session(self, block_height=None, tx_height=None):
#session = OtxSync.create_session()
if block_height != None:
if tx_height == None:
raise ValueError('tx height missing')
self.block_height_session = block_height
self.tx_height_session = tx_height
#session.add(self)
self.date_updated = datetime.datetime.utcnow()
#session.commit()
block_height = self.block_height_session
tx_height = self.tx_height_session
#session.close()
return (block_height, tx_height)
def head(self, block_height=None, tx_height=None):
#session = OtxSync.create_session()
if block_height != None:
if tx_height == None:
raise ValueError('tx height missing')
self.block_height_head = block_height
self.tx_height_head = tx_height
#session.add(self)
self.date_updated = datetime.datetime.utcnow()
#session.commit()
block_height = self.block_height_head
tx_height = self.tx_height_head
#session.close()
return (block_height, tx_height)
@hybrid_property
def synced(self):
#return self.block_height_session == self.block_height_backlog and self.tx_height_session == self.block_height_backlog
return self.block_height_session == self.block_height_backlog and self.tx_height_session == self.tx_height_backlog
@staticmethod
def load(blockchain_string, session):
q = session.query(OtxSync)
q = q.filter(OtxSync.blockchain==blockchain_string)
return q.first()
@staticmethod
def latest(nonce):
session = SessionBase.create_session()
otx = session.query(Otx).filter(Otx.nonce==nonce).order_by(Otx.created.desc()).first()
session.close()
return otx
@staticmethod
def get_expired(datetime_threshold):
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.date_created<datetime_threshold)
q = q.filter(Otx.status==StatusEnum.SENT)
q = q.order_by(Otx.date_created.desc())
q = q.group_by(Otx.nonce)
q = q.group_by(Otx.id)
otxs = q.all()
session.close()
return otxs
def chain(self):
return self.blockchain
def __init__(self, blockchain):
self.blockchain = blockchain
self.block_height_head = 0
self.tx_height_head = 0
self.block_height_session = 0
self.tx_height_session = 0
self.block_height_backlog = 0
self.tx_height_backlog = 0

View File

@@ -0,0 +1,151 @@
# standard imports
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, Enum, ForeignKey, Boolean, NUMERIC
from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property
#from sqlalchemy.orm import relationship, backref
#from sqlalchemy.ext.declarative import declarative_base
# local imports
from .base import SessionBase
from .otx import Otx
from cic_eth.db.util import num_serialize
from cic_eth.error import NotLocalTxError
from cic_eth.db.error import TxStateChangeError
class TxCache(SessionBase):
"""Metadata expansions for outgoing transactions.
These records are not essential for handling of outgoing transaction queues. It is implemented to reduce the amount of computation spent of parsing and analysing raw signed transaction data.
Instantiation of the object will fail if an outgoing transaction record with the same transaction hash does not exist.
Typically three types of transactions are recorded:
- Token transfers; where source and destination token values and addresses are identical, sender and recipient differ.
- Token conversions; source and destination token values and addresses differ, sender and recipient are identical.
- Any other transaction; source and destination token addresses are zero-address.
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param sender: Ethereum address of transaction sender
:type sender: str, 0x-hex
:param recipient: Ethereum address of transaction beneficiary (e.g. token transfer recipient)
:type recipient: str, 0x-hex
:param source_token_address: Contract address of token that sender spent from
:type source_token_address: str, 0x-hex
:param destination_token_address: Contract address of token that recipient will receive balance of
:type destination_token_address: str, 0x-hex
:param from_value: Amount of source tokens spent
:type from_value: number
:param to_value: Amount of destination tokens received
:type to_value: number
:param block_number: Block height the transaction was mined at, or None if not yet mined
:type block_number: number or None
:param tx_number: Block transaction height the transaction was mined at, or None if not yet mined
:type tx_number: number or None
:raises FileNotFoundError: Outgoing transaction for given transaction hash does not exist
"""
__tablename__ = 'tx_cache'
otx_id = Column(Integer, ForeignKey('otx.id'))
source_token_address = Column(String(42))
destination_token_address = Column(String(42))
sender = Column(String(42))
recipient = Column(String(42))
from_value = Column(NUMERIC())
to_value = Column(NUMERIC())
block_number = Column(Integer())
tx_index = Column(Integer())
date_created = Column(DateTime, default=datetime.datetime.utcnow)
date_updated = Column(DateTime, default=datetime.datetime.utcnow)
date_checked = Column(DateTime, default=datetime.datetime.utcnow)
def check(self):
"""Update the "checked" timestamp to current time.
Only manipulates object, does not transaction or commit to backend.
"""
self.date_checked = datetime.datetime.now()
@staticmethod
def clone(
tx_hash_original,
tx_hash_new,
session=None,
):
"""Copy tx cache data and associate it with a new transaction.
:param tx_hash_original: tx cache data to copy
:type tx_hash_original: str, 0x-hex
:param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex
"""
session = SessionBase.bind_session(session)
q = session.query(TxCache)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_original)
txc = q.first()
if txc == None:
SessionBase.release_session(session)
raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None:
SessionBase.release_session(session)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
session.flush()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_new)
otx = q.first()
if otx == None:
SessionBase.release_session(session)
raise NotLocalTxError('new {}'.format(tx_hash_new))
txc_new = TxCache(
otx.tx_hash,
txc.sender,
txc.recipient,
txc.source_token_address,
txc.destination_token_address,
int(txc.from_value),
int(txc.to_value),
session=session,
)
session.add(txc_new)
session.commit()
SessionBase.release_session(session)
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
if tx == None:
SessionBase.release_session(session)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id
self.sender = sender
self.recipient = recipient
self.source_token_address = source_token_address
self.destination_token_address = destination_token_address
self.from_value = from_value
self.to_value = to_value
self.block_number = block_number
self.tx_index = tx_index
# not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created
self.date_checked = self.date_created
SessionBase.release_session(session)

View File

@@ -4,6 +4,12 @@ class TokenCountError(Exception):
pass
class NotLocalTxError(Exception):
"""Exception raised when trying to access a tx not originated from a local task
"""
pass
class PermanentTxError(Exception):
"""Exception raised when encountering a permanent error when sending a tx.
@@ -48,8 +54,6 @@ class RoleMissingError(Exception):
pass
class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks
@@ -64,19 +68,15 @@ class LockedError(Exception):
pass
class SeppukuError(Exception):
"""Exception base class for all errors that should cause system shutdown
"""
class SignerError(SeppukuError):
class SignerError(Exception):
"""Exception raised when signer is unavailable or generates an error
"""
pass
class RoleAgencyError(SeppukuError):
"""Exception raise when a role cannot perform its function. This is a critical exception
class EthError(Exception):
"""Exception raised when unspecified error from evm node is encountered
"""
pass

View File

@@ -3,11 +3,11 @@ import logging
# external imports
import celery
from erc20_faucet import Faucet
from erc20_single_shot_faucet import SingleShotFaucet as Faucet
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import (
strip_0x,
)
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.connection import RPCConnection
from chainlib.eth.sign import (
new_account,
@@ -19,10 +19,8 @@ from chainlib.eth.tx import (
unpack,
)
from chainlib.chain import ChainSpec
from chainlib.error import JSONRPCException
from eth_accounts_index.registry import AccountRegistry # TODO, use interface module instead (needs gas limit method)
from sarafu_faucet import MinterFaucet
from chainqueue.db.models.tx import TxCache
from eth_accounts_index import AccountRegistry
from sarafu_faucet import MinterFaucet as Faucet
# local import
from cic_eth_registry import CICRegistry
@@ -33,6 +31,7 @@ from cic_eth.eth.gas import (
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache
from cic_eth.error import (
RoleMissingError,
SignerError,
@@ -71,18 +70,11 @@ def create(self, password, chain_spec_dict):
a = None
conn = RPCConnection.connect(chain_spec, 'signer')
o = new_account()
try:
a = conn.do(o)
except ConnectionError as e:
raise SignerError(e)
except FileNotFoundError as e:
raise SignerError(e)
a = conn.do(o)
conn.disconnect()
# TODO: It seems infeasible that a can be None in any case, verify
if a == None:
raise SignerError('create account')
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
@@ -127,13 +119,13 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
if writer_address == ZERO_ADDRESS:
session.close()
raise RoleMissingError('call address for resgistering {}'.format(account_address))
account_registry_address = registry.by_name('AccountsIndex', sender_address=call_address)
account_registry_address = registry.by_name('AccountRegistry', sender_address=call_address)
# Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, AccountsIndex.gas)
account_registry = AccountsIndex(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas)
account_registry = AccountRegistry(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
@@ -185,8 +177,8 @@ def gift(self, account_address, chain_spec_dict):
# Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(account_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, MinterFaucet.gas)
faucet = Faucet(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
gas_oracle = self.create_gas_oracle(rpc, Faucet.gas)
faucet = Faucet(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = faucet.give_to(faucet_address, account_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
@@ -227,22 +219,21 @@ def have(self, account, chain_spec_dict):
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
o = sign_message(account, '0x2a')
conn = RPCConnection.connect(chain_spec, 'signer')
try:
conn = RPCConnection.connect(chain_spec, 'signer')
except Exception as e:
logg.debug('cannot sign with {}: {}'.format(account, e))
return None
try:
conn.do(o)
except ConnectionError as e:
raise SignerError(e)
except FileNotFoundError as e:
raise SignerError(e)
except JSONRPCException as e:
conn.disconnect()
return account
except Exception as e:
logg.debug('cannot sign with {}: {}'.format(account, e))
conn.disconnect()
return None
conn.disconnect()
return account
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def set_role(self, tag, address, chain_spec_dict):
@@ -294,7 +285,7 @@ def cache_gift_data(
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = Faucet.parse_give_to_request(tx['data'])
session = self.create_session()
@@ -337,8 +328,8 @@ def cache_account_data(
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = AccountsIndex.parse_add_request(tx['data'])
tx = unpack(tx_signed_raw_bytes, chain_id=chain_spec.chain_id())
tx_data = AccountRegistry.parse_add_request(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache(

View File

@@ -209,7 +209,7 @@ def convert_with_default_reserve(self, tokens, from_address, source_amount, mini
# s_queue.apply_async()
#
# s_check_gas = celery.signature(
# 'cic_eth.eth.gas.check_gas',
# 'cic_eth.eth.tx.check_gas',
# [
# c['address'],
# [c['signed_tx']],
@@ -222,7 +222,7 @@ def convert_with_default_reserve(self, tokens, from_address, source_amount, mini
# )
#
# s_set_sent = celery.signature(
# 'cic_eth.queue.state.set_sent',
# 'cic_eth.queue.tx.set_sent_status',
# [False],
# )
# s_send.link(s_set_sent)
@@ -364,7 +364,7 @@ def otx_cache_convert(
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_convert(tx['data'])
logg.debug('tx data {}'.format(tx_data))

View File

@@ -6,6 +6,7 @@ import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import (
TxFormat,
unpack,
@@ -13,23 +14,18 @@ from chainlib.eth.tx import (
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from hexathon import strip_0x
from chainqueue.db.models.tx import TxCache
from chainqueue.error import NotLocalTxError
from eth_erc20 import ERC20
# local imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.error import (
TokenCountError,
PermanentTxError,
OutOfGasError,
)
from cic_eth.error import TokenCountError, PermanentTxError, OutOfGasError, NotLocalTxError
from cic_eth.queue.tx import register_tx
from cic_eth.eth.gas import (
create_check_gas_task,
MaxGasOracle,
)
#from cic_eth.eth.factory import TxFactory
from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
@@ -61,8 +57,8 @@ def balance(tokens, holder_address, chain_spec_dict):
for t in tokens:
address = t['address']
token = ERC20Token(chain_spec, rpc, address)
c = ERC20(chain_spec)
token = ERC20Token(rpc, address)
c = ERC20()
o = c.balance_of(address, holder_address, sender_address=caller_address)
r = rpc.do(o)
t['balance_network'] = c.parse_balance(r)
@@ -94,7 +90,6 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_d
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
logg.debug('tokens {}'.format(tokens))
if len(tokens) != 1:
raise TokenCountError
t = tokens[0]
@@ -107,14 +102,8 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_d
session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try:
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
except FileNotFoundError as e:
raise SignerError(e)
except ConnectionError as e:
raise SignerError(e)
c = ERC20(signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
rpc.disconnect()
@@ -176,13 +165,8 @@ def approve(self, tokens, holder_address, spender_address, value, chain_spec_dic
session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try:
(tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED)
except FileNotFoundError as e:
raise SignerError(e)
except ConnectionError as e:
raise SignerError(e)
c = ERC20(signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()
rpc.disconnect()
@@ -255,7 +239,7 @@ def cache_transfer_data(
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = ERC20.parse_transfer_request(tx['data'])
recipient_address = tx_data[0]
@@ -296,7 +280,7 @@ def cache_approve_data(
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = ERC20.parse_approve_request(tx['data'])
recipient_address = tx_data[0]

View File

@@ -3,59 +3,87 @@ import logging
# external imports
import celery
from chainlib.eth.gas import price
from hexathon import strip_0x
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
from chainlib.connection import RPCConnection
from chainqueue.db.enum import StatusBits
from chainlib.eth.gas import (
balance,
price,
)
from chainlib.eth.error import (
NotFoundEthException,
EthException,
)
from chainlib.eth.tx import (
TxFactory,
TxFormat,
unpack,
)
from chainlib.eth.contract import (
abi_decode_single,
ABIContractType,
)
from chainlib.eth.gas import (
Gas,
OverrideGasOracle,
)
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
from cic_eth.error import (
AlreadyFillingGasError,
OutOfGasError,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.queue.tx import (
queue_create,
register_tx,
)
from cic_eth.queue.query import get_tx
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndWeb3Task,
CriticalSQLAlchemyAndSignerTask,
CriticalWeb3AndSignerTask,
)
celery_app = celery.current_app
logg = logging.getLogger()
#
#class GasOracle():
# """Provides gas pricing for transactions.
#
# :param w3: Web3 object
# :type w3: web3.Web3
# """
#
# __safe_threshold_amount_value = 2000000000 * 60000 * 3
# __refill_amount_value = __safe_threshold_amount_value * 5
# default_gas_limit = 21000
#
# def __init__(self, conn):
# o = price()
# r = conn.do(o)
# b = bytes.from_hex(strip_0x(r))
# self.gas_price_current = int.from_bytes(b, 'big')
#
# #self.w3 = w3
# #self.gas_price_current = w3.eth.gas_price()
#
#
# def safe_threshold_amount(self):
# """The gas balance threshold under which a new gas refill transaction should be initiated.
#
# :returns: Gas token amount
# :rtype: number
# """
# g = GasOracle.__safe_threshold_amount_value
# logg.warning('gas safe threshold is currently hardcoded to {}'.format(g))
# return g
#
#
# def refill_amount(self):
# """The amount of gas tokens to send in a gas refill transaction.
#
# :returns: Gas token amount
# :rtype: number
# """
# g = GasOracle.__refill_amount_value
# logg.warning('gas refill amount is currently hardcoded to {}'.format(g))
# return g
#
#
# def gas_provider(self):
# """Gas provider address.
#
# :returns: Etheerum account address
# :rtype: str, 0x-hex
# """
# session = SessionBase.create_session()
# a = AccountRole.get_address('GAS_GIFTER', session)
# logg.debug('gasgifter {}'.format(a))
# session.close()
# return a
#
#
# def gas_price(self, category='safe'):
# """Get projected gas price to use for a transaction at the current moment.
#
# When the category parameter is implemented, it can be used to control the priority of a transaction in the network.
#
# :param category: Bid level category to return price for. Currently has no effect.
# :type category: str
# :returns: Gas price
# :rtype: number
# """
# #logg.warning('gas price hardcoded to category "safe"')
# #g = 100
# #return g
# return self.gas_price_current
class MaxGasOracle:
@@ -86,7 +114,7 @@ def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=No
s_check_gas = None
if tx_hashes_hex != None:
s_check_gas = celery.signature(
'cic_eth.eth.gas.check_gas',
'cic_eth.eth.tx.check_gas',
[
tx_hashes_hex,
chain_spec.asdict(),
@@ -98,7 +126,7 @@ def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=No
)
else:
s_check_gas = celery.signature(
'cic_eth.eth.gas.check_gas',
'cic_eth.eth.tx.check_gas',
[
chain_spec.asdict(),
tx_signed_raws_hex,
@@ -108,334 +136,3 @@ def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=No
queue=queue,
)
return s_check_gas
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_parse_tx
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
tx['value'],
tx['value'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
If account balance is sufficient, but level of gas before spend is below "safe" threshold, gas refill is requested, and execution continues normally.
:param tx_hashes: Transaction hashes due to be submitted
:type tx_hashes: list of str, 0x-hex
:param chain_spec_dict: Chain spec dict representation
:type chain_spec_dict: dict
:param txs: Signed raw transaction data, corresponding to tx_hashes
:type txs: list of str, 0x-hex
:param address: Sender address
:type address: str, 0x-hex
:param gas_required: Gas limit * gas price for transaction, (optional, if not set will be retrived from transaction data)
:type gas_required: int
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
if len(txs) == 0:
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
if address == None:
address = o['address']
#if not web3.Web3.isChecksumAddress(address):
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
conn = RPCConnection.connect(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
gas_balance = 0
try:
o = balance(address)
r = conn.do(o)
conn.disconnect()
gas_balance = abi_decode_single(ABIContractType.UINT256, r)
except EthException as e:
conn.disconnect()
raise EthError('gas_balance call for {}: {}'.format(address, e))
logg.debug('address {} has gas {} needs {}'.format(address, gas_balance, gas_required))
session = SessionBase.create_session()
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
if gas_required > gas_balance:
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
address,
chain_spec_dict,
gas_provider,
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.gas.refill_gas',
[
chain_spec_dict,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.state.set_waitforgas',
[
chain_spec_dict,
tx_hash,
],
queue=queue,
)
wait_tasks.append(s)
celery.group(wait_tasks)()
raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, gas_balance))
safe_gas = self.safe_gas_threshold_amount
if gas_balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
address,
chain_spec_dict,
gas_provider,
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.gas.refill_gas',
[
chain_spec_dict,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(gas_provider, address))
ready_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.state.set_ready',
[
chain_spec_dict,
tx_hash,
],
queue=queue,
)
ready_tasks.append(s)
celery.group(ready_tasks)()
return txs
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3AndSignerTask)
def refill_gas(self, recipient_address, chain_spec_dict):
"""Executes a native token transaction to fund the recipient's gas expenditures.
:param recipient_address: Recipient in need of gas
:type recipient_address: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:raises AlreadyFillingGasError: A gas refill transaction for this address is already executing
:returns: Transaction hash.
:rtype: str, 0x-hex
"""
# essentials
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
# Determine value of gas tokens to send
# if an uncompleted gas refill for the same recipient already exists, we still need to spend the nonce
# however, we will perform a 0-value transaction instead
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
if c > 0:
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
# finally determine the value to send
refill_amount = 0
if not zero_amount:
refill_amount = self.safe_gas_refill_amount
# determine sender
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.flush()
# set up evm RPC connection
rpc = RPCConnection.connect(chain_spec, 'default')
# set up transaction builder
nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc)
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
c = Gas(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
# build and add transaction
logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address))
try:
(tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED)
except ConnectionError as e:
raise SignerError(e)
except FileNotFoundError as e:
raise SignerError(e)
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
cache_task = 'cic_eth.eth.gas.cache_gas_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
# add transaction to send queue
s_status = celery.signature(
'cic_eth.queue.state.set_ready',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=queue,
)
t = s_status.apply_async()
return tx_signed_raw_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price.
:param txold_hash_hex: Transaction to re-create
:type txold_hash_hex: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:param gas: Explicitly use the specified gas amount
:type gas: number
:param default_factor: Default factor by which to increment the gas price by
:type default_factor: float
:raises NotLocalTxError: Transaction does not exist in the local queue
:returns: Transaction hash
:rtype: str, 0x-hex
"""
session = SessionBase.create_session()
otx = Otx.load(txold_hash_hex, session)
if otx == None:
session.close()
raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx)
tx = unpack(tx_signed_raw_bytes, chain_spec)
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info.get('routing_key')
logg.debug('before {}'.format(tx))
rpc = RPCConnection.connect(chain_spec, 'default')
new_gas_price = gas
if new_gas_price == None:
o = price()
r = rpc.do(o)
current_gas_price = int(r, 16)
if tx['gasPrice'] > current_gas_price:
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(current_gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
new_gas_price = tx['gasPrice'] + 1
else:
new_gas_price = int(tx['gasPrice'] * default_factor)
#if gas_price > new_gas_price:
# tx['gasPrice'] = gas_price
#else:
# tx['gasPrice'] = new_gas_price
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx))
tx['gasPrice'] = new_gas_price
try:
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx)
except ConnectionError as e:
raise SignerError(e)
except FileNotFoundError as e:
raise SignerError(e)
queue_create(
chain_spec,
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
session=session,
)
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close()
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
tx['from'],
tx['gasPrice'] * tx['gas'],
[tx_hash_hex],
queue=queue,
)
s.apply_async()
return tx_hash_hex

Some files were not shown because too many files have changed in this diff Show More