Compare commits

..

7 Commits

Author SHA1 Message Date
nolash
bd672589e8 Merge remote-tracking branch 'origin/master' into lash/sovereign-import 2021-03-31 13:01:43 +02:00
nolash
51a4e75513 Update cluster dep specs 2021-03-31 12:59:59 +02:00
nolash
bffbea49c3 Remove commented code 2021-03-31 12:58:19 +02:00
nolash
5769b9f9ac Update readme with sovereign script info 2021-03-31 12:55:52 +02:00
nolash
fe75d72ce6 Complete sovereign import script 2021-03-31 12:46:58 +02:00
nolash
9c750e7072 Upgrade faucet 2021-03-31 11:26:14 +02:00
nolash
39646d7948 WIP Add sovereign import script 2021-03-31 10:51:49 +02:00
451 changed files with 11879 additions and 25676 deletions

13
.gitignore vendored
View File

@@ -1,15 +1,2 @@
service-configs/*
!service-configs/.gitkeep
**/node_modules/
__pycache__
*.pyc
*.o
gmon.out
*.egg-info
dist/
build/
**/*sqlite
**/.nyc_output
**/coverage
**/.venv
.idea

View File

@@ -6,7 +6,6 @@ include:
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
- local: 'apps/data-seeding/.gitlab-ci.yml'
stages:
- build

View File

@@ -6,4 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=

View File

@@ -6,4 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=

View File

@@ -1,28 +1,22 @@
# standard imports
import logging
import datetime
# external imports
# third-party imports
import moolb
# local imports
from cic_cache.db.list import (
list_transactions_mined,
list_transactions_account_mined,
list_transactions_mined_with_data,
)
from cic_cache.db import list_transactions_mined
from cic_cache.db import list_transactions_account_mined
logg = logging.getLogger()
class Cache:
class BloomCache:
def __init__(self, session):
self.session = session
class BloomCache(Cache):
@staticmethod
def __get_filter_size(n):
n = 8192 * 8
@@ -93,44 +87,3 @@ class BloomCache(Cache):
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
class DataCache(Cache):
def load_transactions_with_data(self, offset, end):
rows = list_transactions_mined_with_data(self.session, offset, end)
tx_cache = []
highest_block = -1;
lowest_block = -1;
date_is_str = None # stick this in startup
for r in rows:
if highest_block == -1:
highest_block = r['block_number']
lowest_block = r['block_number']
tx_type = 'unknown'
if r['value'] != None:
tx_type = '{}.{}'.format(r['domain'], r['value'])
if date_is_str == None:
date_is_str = type(r['date_block']).__name__ == 'str'
o = {
'block_number': r['block_number'],
'tx_hash': r['tx_hash'],
'date_block': r['date_block'],
'sender': r['sender'],
'recipient': r['recipient'],
'from_value': int(r['from_value']),
'to_value': int(r['to_value']),
'source_token': r['source_token'],
'destination_token': r['destination_token'],
'success': r['success'],
'tx_type': tx_type,
}
if date_is_str:
o['date_block'] = datetime.datetime.fromisoformat(r['date_block'])
tx_cache.append(o)
return (lowest_block, highest_block, tx_cache)

View File

@@ -2,14 +2,9 @@
import logging
# local imports
from .list import (
list_transactions_mined,
list_transactions_account_mined,
add_transaction,
tag_transaction,
add_tag,
)
from .list import list_transactions_mined
from .list import list_transactions_account_mined
from .list import add_transaction
logg = logging.getLogger()

View File

@@ -2,9 +2,8 @@
import logging
import datetime
# external imports
# third-party imports
from cic_cache.db.models.base import SessionBase
from sqlalchemy import text
logg = logging.getLogger()
@@ -28,26 +27,6 @@ def list_transactions_mined(
return r
def list_transactions_mined_with_data(
session,
offset,
end,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined(
session,
address,
@@ -71,8 +50,7 @@ def list_transactions_account_mined(
def add_transaction(
session,
tx_hash,
session, tx_hash,
block_number,
tx_index,
sender,
@@ -84,33 +62,6 @@ def add_transaction(
success,
timestamp,
):
"""Adds a single transaction to the cache persistent storage. Sensible interpretation of all fields is the responsibility of the caller.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param block_number: Block number
:type block_number: int
:param tx_index: Transaction index in block
:type tx_index: int
:param sender: Ethereum address of effective sender
:type sender: str, 0x-hex
:param receiver: Ethereum address of effective recipient
:type receiver: str, 0x-hex
:param source_token: Ethereum address of token used by sender
:type source_token: str, 0x-hex
:param destination_token: Ethereum address of token received by recipient
:type destination_token: str, 0x-hex
:param from_value: Source token value spent in transaction
:type from_value: int
:param to_value: Destination token value received in transaction
:type to_value: int
:param success: True if code execution on network was successful
:type success: bool
:param date_block: Block timestamp
:type date_block: datetime
"""
date_block = datetime.datetime.fromtimestamp(timestamp)
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
tx_hash,
@@ -126,74 +77,3 @@ def add_transaction(
date_block,
)
session.execute(s)
def tag_transaction(
session,
tx_hash,
name,
domain=None,
):
"""Tag a single transaction with a single tag.
Tag must already exist in storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises ValueError: Unknown tag or transaction hash
"""
s = text("SELECT id from tx where tx_hash = :a")
r = session.execute(s, {'a': tx_hash}).fetchall()
tx_id = r[0].values()[0]
if tx_id == None:
raise ValueError('unknown tx hash {}'.format(tx_hash))
#s = text("SELECT id from tag where value = :a and domain = :b")
if domain == None:
s = text("SELECT id from tag where value = :a")
else:
s = text("SELECT id from tag where value = :a and domain = :b")
r = session.execute(s, {'a': name, 'b': domain}).fetchall()
tag_id = r[0].values()[0]
logg.debug('type {} {}'.format(type(tag_id), type(tx_id)))
if tag_id == None:
raise ValueError('unknown tag name {} domain {}'.format(name, domain))
s = text("INSERT INTO tag_tx_link (tag_id, tx_id) VALUES (:a, :b)")
r = session.execute(s, {'a': int(tag_id), 'b': int(tx_id)})
def add_tag(
session,
name,
domain=None,
):
"""Add a single tag to storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises sqlalchemy.exc.IntegrityError: Tag already exists
"""
s = None
if domain == None:
s = text("INSERT INTO tag (value) VALUES (:b)")
else:
s = text("INSERT INTO tag (domain, value) VALUES (:a, :b)")
session.execute(s, {'a': domain, 'b': name})

View File

@@ -1,38 +0,0 @@
"""Transaction tags
Revision ID: aaf2bdce7d6e
Revises: 6604de4203e2
Create Date: 2021-05-01 09:20:20.775082
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'aaf2bdce7d6e'
down_revision = '6604de4203e2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tag',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('domain', sa.String(), nullable=True),
sa.Column('value', sa.String(), nullable=False),
)
op.create_index('idx_tag_domain_value', 'tag', ['domain', 'value'], unique=True)
op.create_table(
'tag_tx_link',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag_id', sa.Integer, sa.ForeignKey('tag.id'), nullable=False),
sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=False),
)
def downgrade():
op.drop_table('tag_tx_link')
op.drop_index('idx_tag_domain_value')
op.drop_table('tag')

View File

@@ -1,2 +0,0 @@
from .erc20 import *
from .faucet import *

View File

@@ -1,27 +0,0 @@
class TagSyncFilter:
"""Holds tag name and domain for an implementing filter.
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
"""
def __init__(self, name, domain=None):
self.tag_name = name
self.tag_domain = domain
def tag(self):
"""Return tag value/domain.
:rtype: Tuple
:returns: tag value/domain.
"""
return (self.tag_name, self.tag_domain)
def __str__(self):
if self.tag_domain == None:
return self.tag_name
return '{}.{}'.format(self.tag_domain, self.tag_name)

View File

@@ -1,83 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.eth.address import (
to_checksum_address,
)
from chainlib.eth.error import RequestMismatchException
from chainlib.status import Status
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import (
NotAContractError,
ContractMismatchError,
)
from eth_erc20 import ERC20
# local imports
from .base import TagSyncFilter
from cic_cache import db as cic_cache_db
logg = logging.getLogger().getChild(__name__)
class ERC20TransferFilter(TagSyncFilter):
def __init__(self, chain_spec):
super(ERC20TransferFilter, self).__init__('transfer', domain='erc20')
self.chain_spec = chain_spec
# TODO: Verify token in declarator / token index
def filter(self, conn, block, tx, db_session=None):
logg.debug('filter {} {}'.format(block, tx))
token = None
try:
token = ERC20Token(self.chain_spec, conn, tx.inputs[0])
except NotAContractError:
logg.debug('not a contract {}'.format(tx.inputs[0]))
return False
except ContractMismatchError:
logg.debug('not an erc20 token {}'.format(tx.inputs[0]))
return False
transfer_data = None
try:
transfer_data = ERC20.parse_transfer_request(tx.payload)
except RequestMismatchException:
logg.debug('erc20 match but not a transfer, skipping')
return False
except ValueError:
logg.debug('erc20 match but bogus data, skipping')
return False
token_sender = tx.outputs[0]
token_recipient = transfer_data[0]
token_value = transfer_data[1]
logg.debug('matched erc20 token transfer {} ({}) to {} value {}'.format(token.name, token.address, transfer_data[0], transfer_data[1]))
cic_cache_db.add_transaction(
db_session,
tx.hash,
block.number,
tx.index,
to_checksum_address(token_sender),
to_checksum_address(token_recipient),
token.address,
token.address,
token_value,
token_value,
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
db_session.commit()
return True

View File

@@ -1,73 +0,0 @@
# standard imports
import logging
# external imports
from erc20_faucet import Faucet
from chainlib.eth.address import to_checksum_address
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status
from hexathon import strip_0x
# local imports
import cic_cache.db as cic_cache_db
from .base import TagSyncFilter
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
class FaucetFilter(TagSyncFilter):
def __init__(self, chain_spec, sender_address=ZERO_ADDRESS):
super(FaucetFilter, self).__init__('give_to', domain='faucet')
self.chain_spec = chain_spec
self.sender_address = sender_address
def filter(self, conn, block, tx, db_session=None):
try:
data = strip_0x(tx.payload)
except ValueError:
return False
logg.debug('data {}'.format(data))
if Faucet.method_for(data[:8]) == None:
return False
token_sender = tx.inputs[0]
token_recipient = data[64+8-40:]
logg.debug('token recipient {}'.format(token_recipient))
f = Faucet(self.chain_spec)
o = f.token(token_sender, sender_address=self.sender_address)
r = conn.do(o)
token = f.parse_token(r)
f = Faucet(self.chain_spec)
o = f.token_amount(token_sender, sender_address=self.sender_address)
r = conn.do(o)
token_value = f.parse_token_amount(r)
cic_cache_db.add_transaction(
db_session,
tx.hash,
block.number,
tx.index,
to_checksum_address(token_sender),
to_checksum_address(token_recipient),
token,
token,
token_value,
token_value,
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
db_session.commit()
return True

View File

@@ -1,110 +0,0 @@
# standard imports
import logging
import json
import re
import base64
# local imports
from cic_cache.cache import (
BloomCache,
DataCache,
)
logg = logging.getLogger(__name__)
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
re_transactions_all_data = r'/txa/(\d+)/(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_data(session, env):
r = re.match(re_transactions_all_data, env.get('PATH_INFO'))
if not r:
return None
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None
offset = r[1]
end = r[2]
if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude')
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end)
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()
o = {
'low': lowest_block,
'high': highest_block,
'data': tx_cache,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)

View File

@@ -1,152 +0,0 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# external imports
import confini
import celery
import sqlalchemy
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_base.eth.syncer import chain_interface
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.connection import RPCConnection
from chainlib.eth.block import (
block_latest,
)
from hexathon import (
strip_0x,
)
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver.head import HeadSyncer
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_cache.db import (
dsn_from_config,
add_tag,
)
from cic_cache.runnable.daemons.filters import (
ERC20TransferFilter,
FaucetFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
def add_block_args(argparser):
argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def register_filter_tags(filters, session):
for f in filters:
tag = f.tag()
try:
add_tag(session, tag[0], domain=tag[1])
session.commit()
logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1]))
except sqlalchemy.exc.IntegrityError:
session.rollback()
logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1]))
def main():
# Connect to blockchain with chainlib
rpc = RPCConnection.connect(chain_spec, 'default')
o = block_latest()
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('current block height {}'.format(block_offset))
syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_offset = block_offset
if config.get('_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend, chain_interface))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend, chain_interface))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
erc20_transfer_filter = ERC20TransferFilter(chain_spec)
faucet_filter = FaucetFilter(chain_spec)
filters = [
erc20_transfer_filter,
faucet_filter,
]
session = SessionBase.create_session()
register_filter_tags(filters, session)
session.close()
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
for f in filters:
syncer.add_filter(f)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
if __name__ == '__main__':
main()

View File

@@ -1,20 +1,18 @@
# standard imports
import os
import re
import logging
import argparse
import json
import base64
# external imports
# third-party imports
import confini
# local imports
from cic_cache import BloomCache
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
from cic_cache.runnable.daemons.query import (
process_transactions_account_bloom,
process_transactions_all_bloom,
process_transactions_all_data,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -46,6 +44,72 @@ logg.debug('config:\n{}'.format(config))
dsn = dsn_from_config(config)
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
# uwsgi application
def application(env, start_response):
@@ -55,16 +119,10 @@ def application(env, start_response):
session = SessionBase.create_session()
for handler in [
process_transactions_all_data,
process_transactions_all_bloom,
process_transactions_account_bloom,
]:
r = None
try:
r = handler(session, env)
except ValueError as e:
start_response('400 {}'.format(str(e)))
return []
r = handler(session, env)
if r != None:
(mime_type, content) = r
break

View File

@@ -0,0 +1,339 @@
# standard imports
import sys
import os
import argparse
import logging
import time
import enum
import re
# third-party imports
import confini
from cic_registry import CICRegistry
from cic_registry.chain import (
ChainRegistry,
ChainSpec,
)
#from cic_registry.bancor import BancorRegistryClient
from cic_registry.token import Token
from cic_registry.error import (
UnknownContractError,
UnknownDeclarationError,
)
from cic_registry.declaration import to_token_declaration
from web3.exceptions import BlockNotFound, TransactionNotFound
from websockets.exceptions import ConnectionClosedError
from requests.exceptions import ConnectionError
import web3
from web3 import HTTPProvider, WebsocketProvider
# local imports
from cic_cache import db
from cic_cache.db.models.base import SessionBase
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
log_topics = {
'transfer': '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
'convert': '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095',
'accountregistry_add': '0a3b0a4f4c6e53dce3dbcad5614cb2ba3a0fa7326d03c5d64b4fa2d565492737',
}
config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--trust-address', default=[], type=str, dest='trust_address', action='append', help='Set address as trust')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_TRUST_ADDRESS': ",".join(getattr(args, 'trust_address', [])),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
# connect to database
dsn = db.dsn_from_config(config)
SessionBase.connect(dsn)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
class RunStateEnum(enum.IntEnum):
INIT = 0
RUN = 1
TERMINATE = 9
def rubberstamp(src):
return True
class Tracker:
def __init__(self, chain_spec, trusts=[]):
self.block_height = 0
self.tx_height = 0
self.state = RunStateEnum.INIT
self.declarator_cache = {}
self.convert_enabled = False
self.trusts = trusts
self.chain_spec = chain_spec
self.declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator')
def __process_tx(self, w3, session, t, r, l, b):
token_value = int(l.data, 16)
token_sender = l.topics[1][-20:].hex()
token_recipient = l.topics[2][-20:].hex()
#ts = ContractRegistry.get_address(t.address)
ts = CICRegistry.get_address(self.chain_spec, t.address())
logg.info('add token transfer {} value {} from {} to {}'.format(
ts.symbol(),
token_value,
token_sender,
token_recipient,
)
)
db.add_transaction(
session,
r.transactionHash.hex(),
r.blockNumber,
r.transactionIndex,
w3.toChecksumAddress(token_sender),
w3.toChecksumAddress(token_recipient),
t.address(),
t.address(),
token_value,
token_value,
r.status == 1,
b.timestamp,
)
session.flush()
# TODO: simplify/ split up and/or comment, function is too long
def __process_convert(self, w3, session, t, r, l, b):
logg.warning('conversions are deactivated')
return
# token_source = l.topics[2][-20:].hex()
# token_source = w3.toChecksumAddress(token_source)
# token_destination = l.topics[3][-20:].hex()
# token_destination = w3.toChecksumAddress(token_destination)
# data_noox = l.data[2:]
# d = data_noox[:64]
# token_from_value = int(d, 16)
# d = data_noox[64:128]
# token_to_value = int(d, 16)
# token_trader = '0x' + data_noox[192-40:]
#
# #ts = ContractRegistry.get_address(token_source)
# ts = CICRegistry.get_address(CICRegistry.bancor_chain_spec, t.address())
# #if ts == None:
# # ts = ContractRegistry.reserves[token_source]
# td = ContractRegistry.get_address(token_destination)
# #if td == None:
# # td = ContractRegistry.reserves[token_source]
# logg.info('add token convert {} -> {} value {} -> {} trader {}'.format(
# ts.symbol(),
# td.symbol(),
# token_from_value,
# token_to_value,
# token_trader,
# )
# )
#
# db.add_transaction(
# session,
# r.transactionHash.hex(),
# r.blockNumber,
# r.transactionIndex,
# w3.toChecksumAddress(token_trader),
# w3.toChecksumAddress(token_trader),
# token_source,
# token_destination,
# r.status == 1,
# b.timestamp,
# )
# session.flush()
def check_token(self, address):
t = None
try:
t = CICRegistry.get_address(CICRegistry.default_chain_spec, address)
return t
except UnknownContractError:
logg.debug('contract {} not in registry'.format(address))
# If nothing was returned, we look up the token in the declarator
for trust in self.trusts:
logg.debug('look up declaration for contract {} with trust {}'.format(address, trust))
fn = self.declarator.function('declaration')
# TODO: cache trust in LRUcache
declaration_array = fn(trust, address).call()
try:
declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp])
logg.debug('found declaration for token {} from trust address {}'.format(address, trust))
except UnknownDeclarationError:
continue
try:
c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address)
t = CICRegistry.add_token(self.chain_spec, c)
break
except ValueError:
logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address))
return t
# TODO use input data instead of logs
def process(self, w3, session, block):
#self.refresh_registry(w3)
tx_count = w3.eth.getBlockTransactionCount(block.hash)
b = w3.eth.getBlock(block.hash)
for i in range(self.tx_height, tx_count):
tx = w3.eth.getTransactionByBlock(block.hash, i)
if tx.to == None:
logg.debug('block {} tx {} is contract creation tx, skipping'.format(block.number, i))
continue
if len(w3.eth.getCode(tx.to)) == 0:
logg.debug('block {} tx {} not a contract tx, skipping'.format(block.number, i))
continue
t = self.check_token(tx.to)
if t != None and isinstance(t, Token):
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['transfer']:
self.__process_tx(w3, session, t, r, l, b)
# TODO: cache contracts in LRUcache
elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address:
r = w3.eth.getTransactionReceipt(tx.hash)
for l in r.logs:
logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex()))
if l.topics[0].hex() == log_topics['convert']:
self.__process_convert(w3, session, t, r, l, b)
session.execute("UPDATE tx_sync SET tx = '{}'".format(tx.hash.hex()))
session.commit()
self.tx_height += 1
def __get_next_retry(self, backoff=False):
return 1
def loop(self):
logg.info('starting at block {} tx index {}'.format(self.block_height, self.tx_height))
self.state = RunStateEnum.RUN
while self.state == RunStateEnum.RUN:
(provider, w3) = web3_constructor()
session = SessionBase.create_session()
try:
block = w3.eth.getBlock(self.block_height)
self.process(w3, session, block)
self.block_height += 1
self.tx_height = 0
except BlockNotFound as e:
logg.debug('no block {} yet, zZzZ...'.format(self.block_height))
time.sleep(self.__get_next_retry())
except ConnectionClosedError as e:
logg.info('connection gone, retrying')
time.sleep(self.__get_next_retry(True))
except OSError as e:
logg.error('cannot connect {}'.format(e))
time.sleep(self.__get_next_retry(True))
except Exception as e:
session.close()
raise(e)
session.close()
def load(self, w3):
session = SessionBase.create_session()
r = session.execute('SELECT tx FROM tx_sync').first()
if r != None:
if r[0] == '0x{0:0{1}X}'.format(0, 64):
logg.debug('last tx was zero-address, starting from scratch')
return
t = w3.eth.getTransaction(r[0])
self.block_height = t.blockNumber
self.tx_height = t.transactionIndex+1
c = w3.eth.getBlockTransactionCount(t.blockHash.hex())
logg.debug('last tx processed {} index {} (max index {})'.format(t.blockNumber, t.transactionIndex, c-1))
if c == self.tx_height:
self.block_height += 1
self.tx_height = 0
session.close()
(provider, w3) = web3_constructor()
trust = config.get('CIC_TRUST_ADDRESS', "").split(",")
chain_spec = args.i
try:
w3.eth.chainId
except Exception as e:
logg.exception(e)
sys.stderr.write('cannot connect to evm node\n')
sys.exit(1)
def main():
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry)
t = Tracker(chain_spec, trust)
t.load(w3)
t.loop()
if __name__ == '__main__':
main()

View File

@@ -5,7 +5,7 @@ version = (
0,
2,
0,
'alpha.2',
'alpha.1',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
DEBUG=

View File

@@ -1,4 +0,0 @@
[cic]
chain_spec =
registry_address =
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
DEBUG=1

View File

@@ -1,2 +1,3 @@
[eth]
provider = http://localhost:63545
provider = ws://localhost:63546
chain_id = 8996

View File

@@ -1,3 +0,0 @@
[syncer]
loop_interval = 1
history_start = 0

View File

@@ -1,2 +1,7 @@
[eth]
provider = ws://localhost:8545
#ttp_provider = http://localhost:8545
#provider = http://localhost:8545
gas_provider_address =
#chain_id =
abi_dir = /usr/local/share/cic/solidity/abi

View File

@@ -1,3 +0,0 @@
[syncer]
loop_interval = 5
history_start = 0

View File

@@ -1,4 +1,2 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=1
DEBUG=

View File

@@ -1,2 +0,0 @@
[syncer]
loop_interval = 1

View File

@@ -17,7 +17,7 @@ RUN apt-get update && \
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a44
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
@@ -43,9 +43,10 @@ COPY cic-cache/config/ /usr/local/etc/cic-cache/
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server

View File

@@ -1,6 +0,0 @@
#!/bin/bash
set -e
>&2 echo executing database migration
python scripts/migrate.py -c /usr/local/etc/cic-cache --migrations-dir /usr/local/share/cic-cache/alembic -vv
set +e

View File

@@ -1,10 +0,0 @@
#!/bin/bash
. ./db.sh
if [ $? -ne "0" ]; then
>&2 echo db migrate fail
exit 1
fi
/usr/local/bin/cic-cache-trackerd $@

View File

@@ -1,13 +1,10 @@
cic-base==0.1.3a3+build.4aa03607
alembic==1.4.2
confini~=0.3.6rc3
confini~=0.3.6b2
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.6a1
cic-registry~=0.5.3a4
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.3a3
erc20-faucet~=0.2.2a1

View File

@@ -2,7 +2,6 @@
import os
import argparse
import logging
import re
import alembic
from alembic.config import Config as AlembicConfig
@@ -24,8 +23,6 @@ argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory')
argparser.add_argument('--reset', action='store_true', help='downgrade before upgrading')
argparser.add_argument('-f', action='store_true', help='force action')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
@@ -56,10 +53,4 @@ ac = AlembicConfig(os.path.join(migrations_dir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', dsn)
ac.set_main_option('script_location', migrations_dir)
if args.reset:
if not args.f:
if not re.match(r'[yY][eE]?[sS]?', input('EEK! this will DELETE the existing db. are you sure??')):
logg.error('user chickened out on requested reset, bailing')
sys.exit(1)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')

View File

@@ -29,13 +29,11 @@ packages =
cic_cache.db
cic_cache.db.models
cic_cache.runnable
cic_cache.runnable.daemons
cic_cache.runnable.daemons.filters
scripts =
./scripts/migrate.py
[options.entry_points]
console_scripts =
cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main
cic-cache-serverd = cic_cache.runnable.daemons.server:main
cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main
cic-cache-trackerd = cic_cache.runnable.tracker:main
cic-cache-serverd = cic_cache.runnable.server:main
cic-cache-taskerd = cic_cache.runnable.tasker:main

View File

@@ -4,7 +4,3 @@ pytest-mock==3.3.1
pysqlite3==0.4.3
sqlparse==0.4.1
pytest-celery==0.0.0a1
eth_tester==0.5.0b3
py-evm==0.3.0a20
cic_base[full]==0.1.3a3+build.4aa03607
sarafu-faucet~=0.0.4a1

View File

@@ -3,7 +3,7 @@ import os
import sys
import datetime
# external imports
# third-party imports
import pytest
# local imports
@@ -84,20 +84,3 @@ def txs(
session.commit()
return [
tx_hash_first,
tx_hash_second,
]
@pytest.fixture(scope='function')
def tag_txs(
init_database,
txs,
):
db.add_tag(init_database, 'taag', domain='test')
init_database.commit()
db.tag_transaction(init_database, txs[1], 'taag', domain='test')

View File

@@ -1,3 +0,0 @@
from chainlib.eth.pytest import *
from cic_eth_registry.pytest.fixtures_tokens import *

View File

@@ -1,69 +0,0 @@
# standard imports
import os
import datetime
import logging
import json
# external imports
import pytest
from sqlalchemy import text
from chainlib.eth.tx import Tx
from chainlib.eth.block import Block
from chainlib.chain import ChainSpec
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter
logg = logging.getLogger()
def test_erc20_filter(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
r = fltr.filter(eth_rpc, block, tx, db_session=init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash

View File

@@ -1,71 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import (
block_by_hash,
Block,
)
from chainlib.eth.tx import (
receipt,
unpack,
transaction,
Tx,
)
from hexathon import strip_0x
from erc20_faucet.faucet import SingleShotFaucet
from sqlalchemy import text
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.faucet import FaucetFilter
logg = logging.getLogger()
def test_filter_faucet(
eth_rpc,
eth_signer,
foo_token,
faucet_noregistry,
init_database,
list_defaults,
contract_roles,
agent_roles,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = FaucetFilter(chain_spec, contract_roles['CONTRACT_DEPLOYER'])
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = SingleShotFaucet(chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.give_to(faucet_noregistry, agent_roles['ALICE'], agent_roles['ALICE'])
r = eth_rpc.do(o)
tx_src = unpack(bytes.fromhex(strip_0x(o['params'][0])), chain_spec)
o = receipt(r)
r = eth_rpc.do(o)
rcpt = Tx.src_normalize(r)
assert r['status'] == 1
o = block_by_hash(r['block_hash'])
r = eth_rpc.do(o)
block_object = Block(r)
tx = Tx(tx_src, block_object)
tx.apply_receipt(rcpt)
r = fltr.filter(eth_rpc, block_object, tx, init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash

View File

@@ -2,7 +2,7 @@
import os
import logging
# external imports
# third-party imports
import pytest
import confini
@@ -13,7 +13,7 @@ logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, 'config/test')
config_dir = os.path.join(root_dir, '.config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))

View File

@@ -3,16 +3,13 @@ import os
import logging
import re
# external imports
# third-party imports
import pytest
import sqlparse
import alembic
from alembic.config import Config as AlembicConfig
# local imports
from cic_cache.db.models.base import SessionBase
from cic_cache.db import dsn_from_config
from cic_cache.db import add_tag
logg = logging.getLogger(__file__)
@@ -29,10 +26,11 @@ def database_engine(
except FileNotFoundError:
pass
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn)
return dsn
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
@pytest.fixture(scope='function')
def init_database(
load_config,
@@ -40,23 +38,52 @@ def init_database(
):
rootdir = os.path.dirname(os.path.dirname(__file__))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrationsdir):
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
if load_config.get('DATABASE_ENGINE') == 'sqlite':
rconn = SessionBase.engine.raw_connection()
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
rconn.executescript(s)
else:
rconn = SessionBase.engine.raw_connection()
rcursor = rconn.cursor()
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
r = re.compile(r'^[A-Z]', re.MULTILINE)
for l in sqlparse.parse(s):
strl = str(l)
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
if not re.search(r, strl):
logg.warning('skipping parsed query line {}'.format(strl))
continue
rcursor.execute(strl)
rconn.commit()
rcursor.execute('SET search_path TO public')
# this doesn't work when run separately, no idea why
# functions have been manually added to original schema from cic-eth
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
#
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
rcursor.close()
session = SessionBase.create_session()
ac = AlembicConfig(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', database_engine)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
session.commit()
yield session
session.commit()
session.close()
@@ -89,14 +116,3 @@ def list_defaults(
return {
'block': 420000,
}
@pytest.fixture(scope='function')
def tags(
init_database,
):
add_tag(init_database, 'foo')
add_tag(init_database, 'baz', domain='bar')
add_tag(init_database, 'xyzzy', domain='bar')
init_database.commit()

View File

@@ -1,31 +0,0 @@
# standard imports
import json
# external imports
import pytest
# local imports
from cic_cache.runnable.daemons.query import process_transactions_all_data
def test_api_all_data(
init_database,
txs,
):
env = {
'PATH_INFO': '/txa/410000/420000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
j = process_transactions_all_data(init_database, env)
o = json.loads(j[1])
assert len(o['data']) == 2
env = {
'PATH_INFO': '/txa/420000/410000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
with pytest.raises(ValueError):
j = process_transactions_all_data(init_database, env)

View File

@@ -4,12 +4,11 @@ import datetime
import logging
import json
# external imports
# third-party imports
import pytest
# local imports
from cic_cache import BloomCache
from cic_cache.cache import DataCache
logg = logging.getLogger()
@@ -34,23 +33,3 @@ def test_cache(
assert b[0] == list_defaults['block'] - 1
def test_cache_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tag_txs,
):
session = init_database
c = DataCache(session)
b = c.load_transactions_with_data(410000, 420000)
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == txs[1]
assert b[2][1]['tx_type'] == 'unknown'
assert b[2][0]['tx_type'] == 'test.taag'

View File

@@ -1,37 +0,0 @@
import os
import datetime
import logging
import json
# external imports
import pytest
# local imports
from cic_cache.db import tag_transaction
logg = logging.getLogger()
def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tags,
):
tag_transaction(init_database, txs[0], 'foo')
tag_transaction(init_database, txs[0], 'baz', domain='bar')
tag_transaction(init_database, txs[1], 'xyzzy', domain='bar')
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.value = 'foo'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'baz'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'xyzzy'").fetchall()
assert r[0][0] == txs[1]

View File

@@ -5,5 +5,3 @@ omit =
cic_eth/db/migrations/*
cic_eth/sync/head.py
cic_eth/sync/mempool.py
cic_eth/queue/state.py
*redis*.py

View File

@@ -5,29 +5,18 @@
.cic_eth_changes_target:
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
#changes:
#- $CONTEXT/$APP_NAME/**/*
when: always
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-eth:
extends:
- .cic_eth_variables
- .cic_eth_changes_target
- .py_build_target_test
test-mr-cic-eth:
extends:
- .py_build_merge_request
- .cic_eth_variables
- .cic_eth_changes_target
stage: test
image: $CI_REGISTRY_IMAGE/$APP_NAME-test:latest
script:
- cd apps/$APP_NAME/
- pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-eth"]
build-push-cic-eth:
extends:
- .py_build_push
- .cic_eth_variables

View File

@@ -1,2 +0,0 @@
include *requirements.txt

View File

@@ -2,7 +2,7 @@
import datetime
import logging
# external imports
# third-party imports
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
@@ -32,9 +32,7 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.AL
:returns: New lock state for address
:rtype: number
"""
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash)
logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@@ -53,9 +51,7 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.
:returns: New lock state for address
:rtype: number
"""
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, flags, address=address)
logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@@ -131,9 +127,7 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS, session=session)
if address != None:
@@ -145,9 +139,3 @@ def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
session.flush()
session.close()
return chained_input
@celery_app.task()
def shutdown(message):
logg.critical('shutdown called: {}'.format(message))
celery_app.control.shutdown() #broadcast('shutdown')

View File

@@ -4,21 +4,12 @@ import logging
# external imports
import celery
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.tx import (
unpack,
TxFactory,
)
from chainlib.eth.gas import OverrideGasOracle
from chainqueue.sql.query import get_tx
from chainqueue.sql.state import set_cancel
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
from hexathon import strip_0x
from potaahto.symbols import snake_and_camel
from chainlib.eth.tx import unpack
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.admin.ctrl import (
lock_send,
@@ -26,16 +17,21 @@ from cic_eth.admin.ctrl import (
lock_queue,
unlock_queue,
)
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.task import BaseTask
from cic_eth.queue.tx import (
get_tx,
set_cancel,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.gas import (
create_check_gas_task,
)
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(bind=True, base=BaseTask)
def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
@celery_app.task(bind=True)
def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
"""Shift all transactions with nonces higher than the offset by the provided position delta.
Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN.
@@ -46,29 +42,25 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
:type tx_hash_orig_hex: str, 0x-hex
:param delta: Amount
"""
chain_spec = ChainSpec.from_dict(chainspec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
queue = None
try:
queue = self.request.delivery_info.get('routing_key')
except AttributeError:
pass
session = BaseTask.session_func()
tx_brief = get_tx(chain_spec, tx_hash_orig_hex, session=session)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx']))
tx = unpack(tx_raw, chain_spec)
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_brief = get_tx(tx_hash_orig_hex)
tx_raw = bytes.fromhex(tx_brief['signed_tx'][2:])
tx = unpack(tx_raw, chain_spec.chain_id())
nonce = tx_brief['nonce']
address = tx['from']
logg.debug('shifting nonce {} position(s) for address {}, offset {}, hash {}'.format(delta, address, nonce, tx['hash']))
logg.debug('shifting nonce {} position(s) for address {}, offset {}'.format(delta, address, nonce))
lock_queue(None, chain_spec.asdict(), address=address)
lock_send(None, chain_spec.asdict(), address=address)
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
lock_queue(None, chain_str, address)
lock_send(None, chain_str, address)
session = SessionBase.create_session()
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==address)
@@ -79,59 +71,51 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
tx_hashes = []
txs = []
for otx in otxs:
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
tx_new = snake_and_camel(tx_new)
tx_raw = bytes.fromhex(otx.signed_tx[2:])
tx_new = unpack(tx_raw, chain_spec.chain_id())
tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce']
tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta
logg.debug('tx_new {}'.format(tx_new))
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
del(tx_new['hashUnsigned'])
tx_new['nonce'] -= delta
gas_oracle = OverrideGasOracle(limit=tx_new['gas'], price=tx_new['gas_price'] + 1) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx_new)
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_new, chain_str)
logg.debug('tx {} -> {} nonce {} -> {}'.format(tx_previous_hash_hex, tx_hash_hex, tx_previous_nonce, tx_new['nonce']))
otx = Otx(
tx_new['nonce'],
tx_hash_hex,
tx_signed_raw_hex,
)
nonce=tx_new['nonce'],
address=tx_new['from'],
tx_hash=tx_hash_hex,
signed_tx=tx_signed_raw_hex,
)
session.add(otx)
session.commit()
# TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces.
set_cancel(chain_spec, strip_0x(tx_previous_hash_hex), manual=True, session=session)
set_cancel(tx_previous_hash_hex, True)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex, session=session)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex)
tx_hashes.append(tx_hash_hex)
txs.append(tx_signed_raw_hex)
session.commit()
session.close()
s = create_check_gas_task(
s = create_check_gas_and_send_task(
txs,
chain_spec,
chain_str,
tx_new['from'],
gas=tx_new['gas'],
tx_hashes_hex=tx_hashes,
queue=queue,
tx_new['gas'],
tx_hashes,
queue,
)
s_unlock_send = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
chain_spec.asdict(),
chain_str,
tx_new['from'],
],
queue=queue,
@@ -139,7 +123,7 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
s_unlock_direct = celery.signature(
'cic_eth.admin.ctrl.unlock_queue',
[
chain_spec.asdict(),
chain_str,
tx_new['from'],
],
queue=queue,

View File

@@ -1,21 +0,0 @@
# standard imports
import logging
# external imports
import celery
# local imports
from cic_eth.task import BaseTask
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(bind=True, base=BaseTask)
def default_token(self):
return {
'symbol': self.default_token_symbol,
'address': self.default_token_address,
'name': self.default_token_name,
'decimals': self.default_token_decimals,
}

View File

@@ -8,7 +8,6 @@ from chainlib.eth.constant import (
ZERO_ADDRESS,
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import UnknownContractError
from chainlib.eth.address import to_checksum_address
from chainlib.eth.contract import code
@@ -23,22 +22,23 @@ from hexathon import (
add_0x,
)
from chainlib.eth.gas import balance
from chainqueue.db.enum import (
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
is_alive,
is_error_status,
status_str,
)
from chainqueue.error import TxStateChangeError
from chainqueue.sql.query import get_tx
from eth_erc20 import ERC20
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
from cic_eth.error import InitializationError
from cic_eth.db.error import TxStateChangeError
from cic_eth.queue.tx import get_tx
app = celery.current_app
@@ -62,29 +62,6 @@ class AdminApi:
self.call_address = call_address
def proxy_do(self, chain_spec, o):
s_proxy = celery.signature(
'cic_eth.task.rpc_proxy',
[
chain_spec.asdict(),
o,
'default',
],
queue=self.queue
)
return s_proxy.apply_async()
def registry(self):
s_registry = celery.signature(
'cic_eth.task.registry',
[],
queue=self.queue
)
return s_registry.apply_async()
def unlock(self, chain_spec, address, flags=None):
s_unlock = celery.signature(
'cic_eth.admin.ctrl.unlock',
@@ -115,7 +92,7 @@ class AdminApi:
def get_lock(self):
s_lock = celery.signature(
'cic_eth.queue.lock.get_lock',
'cic_eth.queue.tx.get_lock',
[],
queue=self.queue,
)
@@ -157,13 +134,11 @@ class AdminApi:
return s_have.apply_async()
def resend(self, tx_hash_hex, chain_spec, in_place=True, unlock=False):
def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False):
logg.debug('resend {}'.format(tx_hash_hex))
s_get_tx_cache = celery.signature(
'cic_eth.queue.query.get_tx_cache',
'cic_eth.queue.tx.get_tx_cache',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue,
@@ -171,6 +146,7 @@ class AdminApi:
# TODO: This check should most likely be in resend task itself
tx_dict = s_get_tx_cache.apply_async().get()
#if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
if not is_alive(getattr(StatusEnum, tx_dict['status']).value):
raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex))
@@ -178,9 +154,9 @@ class AdminApi:
raise NotImplementedError('resend as new not yet implemented')
s = celery.signature(
'cic_eth.eth.gas.resend_with_higher_gas',
'cic_eth.eth.tx.resend_with_higher_gas',
[
chain_spec.asdict(),
chain_str,
None,
1.01,
],
@@ -188,9 +164,8 @@ class AdminApi:
)
s_manual = celery.signature(
'cic_eth.queue.state.set_manual',
'cic_eth.queue.tx.set_manual',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue,
@@ -201,7 +176,7 @@ class AdminApi:
s_gas = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
chain_spec.asdict(),
chain_str,
tx_dict['sender'],
],
queue=self.queue,
@@ -209,13 +184,11 @@ class AdminApi:
s.link(s_gas)
return s_manual.apply_async()
def check_nonce(self, chain_spec, address):
def check_nonce(self, address):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
'cic_eth.queue.tx.get_account_tx',
[
chain_spec.asdict(),
address,
True,
False,
@@ -230,14 +203,14 @@ class AdminApi:
last_nonce = -1
for k in txs.keys():
s_get_tx = celery.signature(
'cic_eth.queue.query.get_tx',
'cic_eth.queue.tx.get_tx',
[
chain_spec.asdict(),
k,
],
queue=self.queue,
)
tx = s_get_tx.apply_async().get()
#tx = get_tx(k)
logg.debug('checking nonce {} (previous {})'.format(tx['nonce'], last_nonce))
nonce_otx = tx['nonce']
if not is_alive(tx['status']) and tx['status'] & local_fail > 0:
@@ -245,14 +218,15 @@ class AdminApi:
blocking_tx = k
blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1:
logg.debug('tx {}'.format(tx))
tx_obj = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec)
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx_obj['from']))
logg.error('nonce gap; {} followed {}'.format(nonce_otx, last_nonce))
blocking_tx = k
blocking_nonce = nonce_otx
break
last_nonce = nonce_otx
#nonce_cache = Nonce.get(address)
#nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
return {
'nonce': {
#'network': nonce_cache,
@@ -261,17 +235,15 @@ class AdminApi:
'blocking': blocking_nonce,
},
'tx': {
'blocking': add_0x(blocking_tx),
'blocking': blocking_tx,
}
}
}
# TODO: is risky since it does not validate that there is actually a nonce problem?
def fix_nonce(self, chain_spec, address, nonce):
def fix_nonce(self, address, nonce, chain_spec):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
'cic_eth.queue.tx.get_account_tx',
[
chain_spec.asdict(),
address,
True,
False,
@@ -281,17 +253,15 @@ class AdminApi:
txs = s.apply_async().get()
tx_hash_hex = None
session = SessionBase.create_session()
for k in txs.keys():
tx_dict = get_tx(chain_spec, k, session=session)
tx_dict = get_tx(k)
if tx_dict['nonce'] == nonce:
tx_hash_hex = k
session.close()
s_nonce = celery.signature(
'cic_eth.admin.nonce.shift_nonce',
[
chain_spec.asdict(),
self.rpc.chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue
@@ -299,6 +269,20 @@ class AdminApi:
return s_nonce.apply_async()
# # TODO: this is a stub, complete all checks
# def ready(self):
# """Checks whether all required initializations have been performed.
#
# :raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met.
# :raises KeyError: An address provided for initialization is not known by the keystore.
# """
# addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
# if addr == ZERO_ADDRESS:
# raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS')
#
# self.w3.eth.sign(addr, text='666f6f')
def account(self, chain_spec, address, include_sender=True, include_recipient=True, renderer=None, w=sys.stdout):
"""Lists locally originated transactions for the given Ethereum address.
@@ -309,9 +293,8 @@ class AdminApi:
"""
last_nonce = -1
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
'cic_eth.queue.tx.get_account_tx',
[
chain_spec.asdict(),
address,
],
queue=self.queue,
@@ -322,20 +305,17 @@ class AdminApi:
for tx_hash in txs.keys():
errors = []
s = celery.signature(
'cic_eth.queue.query.get_tx_cache',
[
chain_spec.asdict(),
tx_hash,
],
'cic_eth.queue.tx.get_tx_cache',
[tx_hash],
queue=self.queue,
)
tx_dict = s.apply_async().get()
if tx_dict['sender'] == address:
if tx_dict['nonce'] - last_nonce > 1:
logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash))
logg.error('nonce gap; {} followed {} for tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['hash']))
errors.append('nonce')
elif tx_dict['nonce'] == last_nonce:
logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash))
logg.warning('nonce {} duplicate in tx {}'.format(tx_dict['nonce'], tx_dict['hash']))
last_nonce = tx_dict['nonce']
if not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
@@ -361,7 +341,6 @@ class AdminApi:
# TODO: Add exception upon non-existent tx aswell as invalid tx data to docstring
# TODO: This method is WAY too long
def tx(self, chain_spec, tx_hash=None, tx_raw=None, registry=None, renderer=None, w=sys.stdout):
"""Output local and network details about a given transaction with local origin.
@@ -384,99 +363,50 @@ class AdminApi:
if tx_raw != None:
tx_hash = add_0x(keccak256_hex_to_hex(tx_raw))
#tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
s = celery.signature(
'cic_eth.queue.query.get_tx_cache',
[
chain_spec.asdict(),
tx_hash,
],
'cic_eth.queue.tx.get_tx_cache',
[tx_hash],
queue=self.queue,
)
t = s.apply_async()
tx = t.get()
tx = s.apply_async().get()
source_token = None
if tx['source_token'] != ZERO_ADDRESS:
source_token_declaration = None
if registry != None:
try:
source_token_declaration = registry.by_address(tx['source_token'], sender_address=self.call_address)
except UnknownContractError:
logg.warning('unknown source token contract {} (direct)'.format(tx['source_token']))
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['source_token'],
],
queue=self.queue
)
t = s.apply_async()
source_token_declaration = t.get()
if source_token_declaration != None:
logg.warning('found declarator record for source token {} but not checking validity'.format(tx['source_token']))
source_token = ERC20Token(chain_spec, self.rpc, tx['source_token'])
logg.debug('source token set tup {}'.format(source_token))
try:
source_token = registry.by_address(tx['source_token'])
#source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
except UnknownContractError:
#source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#source_token = CICRegistry.add_token(chain_spec, source_token_contract)
logg.warning('unknown source token contract {}'.format(tx['source_token']))
destination_token = None
if tx['destination_token'] != ZERO_ADDRESS:
destination_token_declaration = None
if registry != None:
try:
destination_token_declaration = registry.by_address(tx['destination_token'], sender_address=self.call_address)
except UnknownContractError:
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['destination_token'],
],
queue=self.queue
)
t = s.apply_async()
destination_token_declaration = t.get()
if destination_token_declaration != None:
logg.warning('found declarator record for destination token {} but not checking validity'.format(tx['destination_token']))
destination_token = ERC20Token(chain_spec, self.rpc, tx['destination_token'])
if tx['source_token'] != ZERO_ADDRESS:
try:
#destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
destination_token = registry.by_address(tx['destination_token'])
except UnknownContractError:
#destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
tx['sender_description'] = 'Custodial account'
tx['recipient_description'] = 'Custodial account'
o = code(tx['sender'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
if registry != None:
try:
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract at {}'.format(tx['sender'])
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
tx['sender_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['sender'],
],
queue=self.queue
)
t = s.apply_async()
tx['sender_description'] = t.get()
if tx['sender_description'] == None:
tx['sender_description'] = 'Unknown contract'
try:
#sender_contract = CICRegistry.get_address(chain_spec, tx['sender'])
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract at {}'.format(tx['sender']) #sender_contract)
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
tx['sender_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.eth.account.have',
@@ -505,31 +435,16 @@ class AdminApi:
tx['sender_description'] = role
o = code(tx['recipient'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
if registry != None:
try:
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient'])
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:
tx['recipient_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['recipient'],
],
queue=self.queue
)
t = s.apply_async()
tx['recipient_description'] = t.get()
if tx['recipient_description'] == None:
tx['recipient_description'] = 'Unknown contract'
try:
#recipient_contract = CICRegistry.by_address(tx['recipient'])
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) #recipient_contract)
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:
tx['recipient_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.eth.account.have',
@@ -557,38 +472,28 @@ class AdminApi:
if role != None:
tx['recipient_description'] = role
erc20_c = ERC20(chain_spec)
if source_token != None:
tx['source_token_symbol'] = source_token.symbol
o = erc20_c.balance_of(tx['source_token'], tx['sender'], sender_address=self.call_address)
r = self.rpc.do(o)
tx['sender_token_balance'] = erc20_c.parse_balance(r)
tx['source_token_symbol'] = source_token.symbol()
tx['sender_token_balance'] = source_token.function('balanceOf')(tx['sender']).call()
if destination_token != None:
tx['destination_token_symbol'] = destination_token.symbol
o = erc20_c.balance_of(tx['destination_token'], tx['recipient'], sender_address=self.call_address)
r = self.rpc.do(o)
tx['recipient_token_balance'] = erc20_c.parse_balance(r)
#tx['recipient_token_balance'] = destination_token.function('balanceOf')(tx['recipient']).call()
tx['destination_token_symbol'] = destination_token.symbol()
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
# TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
tx['network_status'] = 'Not in node'
tx['network_status'] = 'Not submitted'
r = None
try:
o = transaction(tx_hash)
t = self.proxy_do(chain_spec, o)
r = t.get()
if r != None:
tx['network_status'] = 'Mempool'
r = self.rpc.do(o)
except Exception as e:
logg.warning('(too permissive exception handler, please fix!) {}'.format(e))
tx['network_status'] = 'Mempool'
if r != None:
try:
o = receipt(tx_hash)
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
logg.debug('h {} o {}'.format(tx_hash, o))
if int(strip_0x(r['status'])) == 1:
tx['network_status'] = 'Confirmed'
@@ -603,24 +508,21 @@ class AdminApi:
pass
o = balance(tx['sender'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
tx['sender_gas_balance'] = r
o = balance(tx['recipient'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
tx['recipient_gas_balance'] = r
tx_unpacked = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec)
tx_unpacked = unpack(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
tx['gas_price'] = tx_unpacked['gasPrice']
tx['gas_limit'] = tx_unpacked['gas']
tx['data'] = tx_unpacked['data']
s = celery.signature(
'cic_eth.queue.state.get_state_log',
'cic_eth.queue.tx.get_state_log',
[
chain_spec.asdict(),
tx_hash,
],
queue=self.queue,

View File

@@ -37,7 +37,7 @@ class Api:
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
logg.info('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
@@ -62,168 +62,29 @@ class Api:
)
def default_token(self):
s_token = celery.signature(
'cic_eth.admin.token.default_token',
[],
queue=self.queue,
)
if self.callback_param != None:
s_token.link(self.callback_success)
return s_token.apply_async()
# def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
# """Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
#
# :param from_address: Ethereum address of sender
# :type from_address: str, 0x-hex
# :param to_address: Ethereum address of receipient
# :type to_address: str, 0x-hex
# :param target_return: Estimated return from conversion
# :type target_return: int
# :param minimum_return: The least value of destination token return to allow
# :type minimum_return: int
# :param from_token_symbol: ERC20 token symbol of token being converted
# :type from_token_symbol: str
# :param to_token_symbol: ERC20 token symbol of token to receive
# :type to_token_symbol: str
# :returns: uuid of root task
# :rtype: celery.Task
# """
# raise NotImplementedError('out of service until new DEX migration is done')
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# [from_token_symbol, to_token_symbol],
# self.chain_spec.asdict(),
# LockEnum.QUEUE,
# from_address,
# ],
# queue=self.queue,
# )
# s_nonce = celery.signature(
# 'cic_eth.eth.nonce.reserve_nonce',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_tokens = celery.signature(
# 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
# [
# self.chain_str,
# ],
# queue=self.queue,
# )
# s_convert = celery.signature(
# 'cic_eth.eth.bancor.convert_with_default_reserve',
# [
# from_address,
# target_return,
# minimum_return,
# to_address,
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_nonce.link(s_tokens)
# s_check.link(s_nonce)
# if self.callback_param != None:
# s_convert.link(self.callback_success)
# s_tokens.link(s_convert).on_error(self.callback_error)
# else:
# s_tokens.link(s_convert)
#
# t = s_check.apply_async(queue=self.queue)
# return t
#
#
# def convert(self, from_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
# """Executes a chain of celery tasks that performs conversion between two ERC20 tokens.
#
# :param from_address: Ethereum address of sender
# :type from_address: str, 0x-hex
# :param target_return: Estimated return from conversion
# :type target_return: int
# :param minimum_return: The least value of destination token return to allow
# :type minimum_return: int
# :param from_token_symbol: ERC20 token symbol of token being converted
# :type from_token_symbol: str
# :param to_token_symbol: ERC20 token symbol of token to receive
# :type to_token_symbol: str
# :returns: uuid of root task
# :rtype: celery.Task
# """
# raise NotImplementedError('out of service until new DEX migration is done')
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# [from_token_symbol, to_token_symbol],
# self.chain_spec.asdict(),
# LockEnum.QUEUE,
# from_address,
# ],
# queue=self.queue,
# )
# s_nonce = celery.signature(
# 'cic_eth.eth.nonce.reserve_nonce',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_tokens = celery.signature(
# 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_convert = celery.signature(
# 'cic_eth.eth.bancor.convert_with_default_reserve',
# [
# from_address,
# target_return,
# minimum_return,
# from_address,
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_nonce.link(s_tokens)
# s_check.link(s_nonce)
# if self.callback_param != None:
# s_convert.link(self.callback_success)
# s_tokens.link(s_convert).on_error(self.callback_error)
# else:
# s_tokens.link(s_convert)
#
# t = s_check.apply_async(queue=self.queue)
# return t
def transfer_from(self, from_address, to_address, value, token_symbol, spender_address):
"""Executes a chain of celery tasks that performs a transfer of ERC20 tokens by one address on behalf of another address to a third party.
def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:param to_address: Ethereum address of receipient
:type to_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:param spender_address: Ethereum address of recipient
:type spender_address: str, 0x-hex
:param target_return: Estimated return from conversion
:type target_return: int
:param minimum_return: The least value of destination token return to allow
:type minimum_return: int
:param from_token_symbol: ERC20 token symbol of token being converted
:type from_token_symbol: str
:param to_token_symbol: ERC20 token symbol of token to receive
:type to_token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
@@ -231,13 +92,72 @@ class Api:
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_convert = celery.signature(
'cic_eth.eth.bancor.convert_with_default_reserve',
[
from_address,
target_return,
minimum_return,
to_address,
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
else:
s_tokens.link(s_convert)
t = s_check.apply_async(queue=self.queue)
return t
def convert(self, from_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param target_return: Estimated return from conversion
:type target_return: int
:param minimum_return: The least value of destination token return to allow
:type minimum_return: int
:param from_token_symbol: ERC20 token symbol of token being converted
:type from_token_symbol: str
:param to_token_symbol: ERC20 token symbol of token to receive
:type to_token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
@@ -245,41 +165,29 @@ class Api:
],
queue=self.queue,
)
s_allow = celery.signature(
'cic_eth.eth.erc20.check_allowance',
s_convert = celery.signature(
'cic_eth.eth.bancor.convert_with_default_reserve',
[
from_address,
value,
target_return,
minimum_return,
from_address,
self.chain_spec.asdict(),
spender_address,
],
queue=self.queue,
)
s_transfer = celery.signature(
'cic_eth.eth.erc20.transfer_from',
[
from_address,
to_address,
value,
self.chain_spec.asdict(),
spender_address,
],
queue=self.queue,
)
s_tokens.link(s_allow)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_transfer.link(self.callback_success)
s_allow.link(s_transfer).on_error(self.callback_error)
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
else:
s_allow.link(s_transfer)
s_tokens.link(s_convert)
t = s_check.apply_async(queue=self.queue)
return t
def transfer(self, from_address, to_address, value, token_symbol):
"""Executes a chain of celery tasks that performs a transfer of ERC20 tokens from one address to another.
@@ -305,9 +213,8 @@ class Api:
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
'cic_eth.eth.tx.reserve_nonce',
[
self.chain_spec.asdict(),
from_address,
],
queue=self.queue,
@@ -452,9 +359,8 @@ class Api:
if register:
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
'cic_eth.eth.tx.reserve_nonce',
[
self.chain_spec.asdict(),
'ACCOUNT_REGISTRY_WRITER',
],
queue=self.queue,
@@ -491,15 +397,14 @@ class Api:
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
'cic_eth.eth.tx.reserve_nonce',
[
self.chain_spec.asdict(),
'GAS_GIFTER',
],
queue=self.queue,
)
s_refill = celery.signature(
'cic_eth.eth.gas.refill_gas',
'cic_eth.eth.tx.refill_gas',
[
self.chain_spec.asdict(),
],
@@ -534,9 +439,8 @@ class Api:
"""
offset = 0
s_local = celery.signature(
'cic_eth.queue.query.get_account_tx',
'cic_eth.queue.tx.get_account_tx',
[
self.chain_spec.asdict(),
address,
],
queue=self.queue,

View File

@@ -1,8 +0,0 @@
from cic_eth.db.models.base import SessionBase
def health(*args, **kwargs):
session = SessionBase.create_session()
session.execute('SELECT count(*) from alembic_version')
session.close()
return True

View File

@@ -1,48 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import RPCConnection
from chainlib.chain import ChainSpec
from chainlib.eth.gas import balance
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
from cic_eth.db.enum import LockEnum
from cic_eth.error import LockedError
from cic_eth.admin.ctrl import check_lock
logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
session = SessionBase.create_session()
config = kwargs['config']
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec))
try:
check_lock(None, None, LockEnum.INIT)
except LockedError:
logg.warning('INIT lock is set, skipping GAS GIFTER balance check.')
return True
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
rpc = RPCConnection.connect(chain_spec, 'default')
o = balance(gas_provider)
r = rpc.do(o)
try:
r = int(r, 16)
except TypeError:
r = int(r)
gas_min = int(config.get('ETH_GAS_GIFTER_MINIMUM_BALANCE'))
if r < gas_min:
logg.error('EEK! gas gifter has balance {}, below minimum {}'.format(r, gas_min))
return False
return True

View File

@@ -1,18 +0,0 @@
# external imports
import redis
import os
def health(*args, **kwargs):
r = redis.Redis(
host=kwargs['config'].get('REDIS_HOST'),
port=kwargs['config'].get('REDIS_PORT'),
db=kwargs['config'].get('REDIS_DB'),
)
try:
r.set(kwargs['unit'], os.getpid())
except redis.connection.ConnectionError:
return False
except redis.connection.ResponseError:
return False
return True

View File

@@ -1,37 +0,0 @@
# standard imports
import time
import logging
from urllib.error import URLError
# external imports
from chainlib.connection import RPCConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.sign import sign_message
from chainlib.error import JSONRPCException
logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
blocked = True
max_attempts = 5
conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer')
for i in range(max_attempts):
idx = i + 1
logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts))
try:
conn.do(sign_message(ZERO_ADDRESS, '0x2a'))
except FileNotFoundError:
pass
except ConnectionError:
pass
except URLError:
pass
except JSONRPCException:
logg.debug('signer connection succeeded')
return True
if idx < max_attempts:
time.sleep(0.5)
return False

View File

@@ -11,6 +11,10 @@ logg = logging.getLogger()
# an Engine, which the Session will use for connection
# resources
# TODO: Remove the package exports, all models should be imported using full path
from .models.otx import Otx
from .models.convert import TxConvertTransfer
def dsn_from_config(config):
"""Generate a dsn string from the provided config dict.

View File

@@ -74,11 +74,10 @@ class LockEnum(enum.IntEnum):
QUEUE: Disable queueing new or modified transactions
"""
STICKY=1
INIT=2
CREATE=4
SEND=8
QUEUE=16
QUERY=32
CREATE=2
SEND=4
QUEUE=8
QUERY=16
ALL=int(0xfffffffffffffffe)

View File

@@ -1,29 +0,0 @@
"""Add chainqueue
Revision ID: 0ec0d6d1e785
Revises:
Create Date: 2021-04-02 18:30:55.398388
"""
from alembic import op
import sqlalchemy as sa
from chainqueue.db.migrations.sqlalchemy import (
chainqueue_upgrade,
chainqueue_downgrade,
)
# revision identifiers, used by Alembic.
revision = '0ec0d6d1e785'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
chainqueue_upgrade(0, 0, 1)
def downgrade():
chainqueue_downgrade(0, 0, 1)

View File

@@ -1,29 +0,0 @@
"""Roles
Revision ID: 1f1b3b641d08
Revises: 9c420530eeb2
Create Date: 2021-04-02 18:40:27.787631
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1f1b3b641d08'
down_revision = '9c420530eeb2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
def downgrade():
op.drop_table('account_role')

View File

@@ -0,0 +1,35 @@
"""Add new syncer table
Revision ID: 2a07b543335e
Revises: a2e2aab8f331
Create Date: 2020-12-27 09:35:44.017981
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a07b543335e'
down_revision = 'a2e2aab8f331'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'blockchain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
def downgrade():
op.drop_table('blockchain_sync')

View File

@@ -1,8 +1,8 @@
"""Nonce
"""Nonce reservation
Revision ID: 9c420530eeb2
Revises: b125cbf81e32
Create Date: 2021-04-02 18:38:56.459334
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
@@ -10,22 +10,15 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c420530eeb2'
down_revision = 'b125cbf81e32'
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
op.create_table(
'nonce_task_reservation',
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
@@ -36,4 +29,3 @@ def upgrade():
def downgrade():
op.drop_table('nonce_task_reservation')
op.drop_table('nonce')

View File

@@ -0,0 +1,29 @@
"""Add nonce index
Revision ID: 49b348246d70
Revises: 52c7c59cd0b1
Create Date: 2020-12-19 09:45:36.186446
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49b348246d70'
down_revision = '52c7c59cd0b1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('nonce')

View File

@@ -0,0 +1,31 @@
"""Add account roles
Revision ID: 52c7c59cd0b1
Revises: 9c4bd7491015
Create Date: 2020-12-19 07:21:38.249237
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '52c7c59cd0b1'
down_revision = '9c4bd7491015'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
pass
def downgrade():
op.drop_table('account_role')
pass

View File

@@ -0,0 +1,30 @@
"""Add otx state log
Revision ID: 6ac7a1dadc46
Revises: 89e1e9baa53c
Create Date: 2021-01-30 13:59:49.022373
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ac7a1dadc46'
down_revision = '89e1e9baa53c'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_state_log',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
sa.Column('status', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('otx_state_log')

View File

@@ -0,0 +1,31 @@
"""Add attempts and version log for otx
Revision ID: 71708e943dbd
Revises: 7e8d7626e38f
Create Date: 2020-09-26 14:41:19.298651
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '71708e943dbd'
down_revision = '7e8d7626e38f'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_attempts',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('otx_attempts')
pass

View File

@@ -0,0 +1,31 @@
"""add blocknumber pointer
Revision ID: 7cb65b893934
Revises: 8593fa1ca0f4
Create Date: 2020-09-24 19:29:13.543648
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7cb65b893934'
down_revision = '8593fa1ca0f4'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass
def downgrade():
op.drop_table('watcher_state')
pass

View File

@@ -0,0 +1,45 @@
"""Add block sync
Revision ID: 7e8d7626e38f
Revises: cd2052be6db2
Create Date: 2020-09-26 11:12:27.818524
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7e8d7626e38f'
down_revision = 'cd2052be6db2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'block_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False, unique=True),
sa.Column('block_height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('block_height_session', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_session', sa.Integer, nullable=False, default=0),
sa.Column('block_height_head', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_head', sa.Integer, nullable=False, default=0),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.drop_table('watcher_state')
pass
def downgrade():
op.drop_table('block_sync')
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass

View File

@@ -0,0 +1,35 @@
"""Add transaction queue
Revision ID: 8593fa1ca0f4
Revises:
Create Date: 2020-09-22 21:56:42.117047
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '8593fa1ca0f4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('signed_tx', sa.Text, nullable=False),
sa.Column('status', sa.Integer, nullable=False, default=-9),
sa.Column('block', sa.Integer),
)
op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True)
def downgrade():
op.drop_index('idx_otx_tx')
op.drop_table('otx')

View File

@@ -1,20 +1,17 @@
"""Lock
"""Add account lock
Revision ID: 75d4767b3031
Revises: 1f1b3b641d08
Create Date: 2021-04-02 18:41:20.864265
Revision ID: 89e1e9baa53c
Revises: 2a07b543335e
Create Date: 2021-01-27 19:57:36.793882
"""
import datetime
from alembic import op
import sqlalchemy as sa
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth.db.enum import LockEnum
# revision identifiers, used by Alembic.
revision = '75d4767b3031'
down_revision = '1f1b3b641d08'
revision = '89e1e9baa53c'
down_revision = '2a07b543335e'
branch_labels = None
depends_on = None
@@ -26,12 +23,10 @@ def upgrade():
sa.Column("address", sa.String(42), nullable=True),
sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False, default=datetime.datetime.utcnow),
sa.Column("otx_id", sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column("date_created", sa.DateTime, nullable=False),
sa.Column("otx_id", sa.Integer, nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
def downgrade():
op.drop_index('idx_chain_address')

View File

@@ -0,0 +1,26 @@
"""Rename block sync table
Revision ID: 9c4bd7491015
Revises: 9daa16518a91
Create Date: 2020-10-15 23:45:56.306898
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c4bd7491015'
down_revision = '9daa16518a91'
branch_labels = None
depends_on = None
def upgrade():
op.rename_table('block_sync', 'otx_sync')
pass
def downgrade():
op.rename_table('otx_sync', 'block_sync')
pass

View File

@@ -0,0 +1,30 @@
"""add tx sync state
Revision ID: 9daa16518a91
Revises: e3b5330ee71c
Create Date: 2020-10-10 14:43:18.699276
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9daa16518a91'
down_revision = 'e3b5330ee71c'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx_sync',
# sa.Column('tx', sa.String(66), nullable=False),
# )
# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')")
pass
def downgrade():
# op.drop_table('tx_sync')
pass

View File

@@ -0,0 +1,34 @@
"""Add date accessed to txcache
Revision ID: a2e2aab8f331
Revises: 49b348246d70
Create Date: 2020-12-24 18:58:06.137812
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a2e2aab8f331'
down_revision = '49b348246d70'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'tx_cache',
sa.Column(
'date_checked',
sa.DateTime,
nullable=False
)
)
pass
def downgrade():
# drop does not work withs qlite
#op.drop_column('tx_cache', 'date_checked')
pass

View File

@@ -1,8 +1,8 @@
"""Convert
"""convert tx index
Revision ID: aee12aeb47ec
Revises: 5ca4b77ce205
Create Date: 2021-04-02 18:42:45.233356
Revision ID: cd2052be6db2
Revises: 7cb65b893934
Create Date: 2020-09-24 21:20:51.580500
"""
from alembic import op
@@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'aee12aeb47ec'
down_revision = '5ca4b77ce205'
revision = 'cd2052be6db2'
down_revision = '7cb65b893934'
branch_labels = None
depends_on = None
@@ -20,8 +20,10 @@ def upgrade():
op.create_table(
'tx_convert_transfer',
sa.Column('id', sa.Integer, primary_key=True),
#sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('transfer_tx_hash', sa.String(66), unique=True),
# sa.Column('holder_address', sa.String(42), nullable=False),
sa.Column('recipient_address', sa.String(42), nullable=False),
)
op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address'])

View File

@@ -0,0 +1,31 @@
"""Add tx tracker record
Revision ID: df19f4e69676
Revises: 71708e943dbd
Create Date: 2020-10-09 23:31:44.563498
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'df19f4e69676'
down_revision = '71708e943dbd'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx',
# sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('date_added', sa.DateTime, nullable=False),
# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True),
# sa.Column('success', sa.Boolean(), nullable=False),
# )
pass
def downgrade():
# op.drop_table('tx')
pass

View File

@@ -0,0 +1,38 @@
"""Add cached values for tx
Revision ID: e3b5330ee71c
Revises: df19f4e69676
Create Date: 2020-10-10 00:17:07.094893
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e3b5330ee71c'
down_revision = 'df19f4e69676'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
sa.Column('source_token_address', sa.String(42), nullable=False),
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)
def downgrade():
op.drop_table('tx_cache')
pass

View File

@@ -1,8 +1,8 @@
"""Add chain syncer
Revision ID: 6604de4203e2
Revises: 63b629f14a85
Create Date: 2021-04-01 08:10:29.156243
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
@@ -14,15 +14,15 @@ from chainsyncer.db.migrations.sqlalchemy import (
# revision identifiers, used by Alembic.
revision = '6604de4203e2'
down_revision = '63b629f14a85'
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -1,8 +1,8 @@
"""DEbug
"""debug output
Revision ID: 5ca4b77ce205
Revises: 75d4767b3031
Create Date: 2021-04-02 18:42:12.257244
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
@@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '5ca4b77ce205'
down_revision = '75d4767b3031'
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
@@ -24,7 +24,9 @@ def upgrade():
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = .
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
#sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-eth
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1,77 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,35 @@
"""Add new syncer table
Revision ID: 2a07b543335e
Revises: a2e2aab8f331
Create Date: 2020-12-27 09:35:44.017981
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a07b543335e'
down_revision = 'a2e2aab8f331'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'blockchain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
def downgrade():
op.drop_table('blockchain_sync')

View File

@@ -0,0 +1,31 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -0,0 +1,29 @@
"""Add nonce index
Revision ID: 49b348246d70
Revises: 52c7c59cd0b1
Create Date: 2020-12-19 09:45:36.186446
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49b348246d70'
down_revision = '52c7c59cd0b1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('nonce')

View File

@@ -0,0 +1,31 @@
"""Add account roles
Revision ID: 52c7c59cd0b1
Revises: 9c4bd7491015
Create Date: 2020-12-19 07:21:38.249237
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '52c7c59cd0b1'
down_revision = '9c4bd7491015'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
pass
def downgrade():
op.drop_table('account_role')
pass

View File

@@ -0,0 +1,30 @@
"""Add otx state log
Revision ID: 6ac7a1dadc46
Revises: 89e1e9baa53c
Create Date: 2021-01-30 13:59:49.022373
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ac7a1dadc46'
down_revision = '89e1e9baa53c'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_state_log',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
sa.Column('status', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('otx_state_log')

View File

@@ -0,0 +1,31 @@
"""Add attempts and version log for otx
Revision ID: 71708e943dbd
Revises: 7e8d7626e38f
Create Date: 2020-09-26 14:41:19.298651
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '71708e943dbd'
down_revision = '7e8d7626e38f'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_attempts',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('otx_attempts')
pass

View File

@@ -0,0 +1,31 @@
"""add blocknumber pointer
Revision ID: 7cb65b893934
Revises: 8593fa1ca0f4
Create Date: 2020-09-24 19:29:13.543648
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7cb65b893934'
down_revision = '8593fa1ca0f4'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass
def downgrade():
op.drop_table('watcher_state')
pass

View File

@@ -0,0 +1,42 @@
"""Add block sync
Revision ID: 7e8d7626e38f
Revises: cd2052be6db2
Create Date: 2020-09-26 11:12:27.818524
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7e8d7626e38f'
down_revision = 'cd2052be6db2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'block_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False, unique=True),
sa.Column('height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('height_session', sa.Integer, nullable=False, default=0),
sa.Column('height_head', sa.Integer, nullable=False, default=0),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.drop_table('watcher_state')
pass
def downgrade():
op.drop_table('block_sync')
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass

View File

@@ -0,0 +1,35 @@
"""Add transaction queue
Revision ID: 8593fa1ca0f4
Revises:
Create Date: 2020-09-22 21:56:42.117047
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '8593fa1ca0f4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('signed_tx', sa.Text, nullable=False),
sa.Column('status', sa.Integer, nullable=False, default=-9),
sa.Column('block', sa.Integer),
)
op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True)
def downgrade():
op.drop_index('idx_otx_tx')
op.drop_table('otx')

View File

@@ -0,0 +1,33 @@
"""Add account lock
Revision ID: 89e1e9baa53c
Revises: 2a07b543335e
Create Date: 2021-01-27 19:57:36.793882
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '89e1e9baa53c'
down_revision = '2a07b543335e'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'lock',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column("address", sa.String(42), nullable=True),
sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False),
sa.Column("otx_id", sa.Integer, nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
def downgrade():
op.drop_index('idx_chain_address')
op.drop_table('lock')

View File

@@ -0,0 +1,26 @@
"""Rename block sync table
Revision ID: 9c4bd7491015
Revises: 9daa16518a91
Create Date: 2020-10-15 23:45:56.306898
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c4bd7491015'
down_revision = '9daa16518a91'
branch_labels = None
depends_on = None
def upgrade():
op.rename_table('block_sync', 'otx_sync')
pass
def downgrade():
op.rename_table('otx_sync', 'block_sync')
pass

View File

@@ -0,0 +1,30 @@
"""add tx sync state
Revision ID: 9daa16518a91
Revises: e3b5330ee71c
Create Date: 2020-10-10 14:43:18.699276
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9daa16518a91'
down_revision = 'e3b5330ee71c'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx_sync',
# sa.Column('tx', sa.String(66), nullable=False),
# )
# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')")
pass
def downgrade():
# op.drop_table('tx_sync')
pass

View File

@@ -0,0 +1,33 @@
"""Add date accessed to txcache
Revision ID: a2e2aab8f331
Revises: 49b348246d70
Create Date: 2020-12-24 18:58:06.137812
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a2e2aab8f331'
down_revision = '49b348246d70'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'tx_cache',
sa.Column(
'date_checked',
sa.DateTime,
nullable=False
)
)
pass
def downgrade():
op.drop_column('tx_cache', 'date_checked')
pass

View File

@@ -0,0 +1,34 @@
"""convert tx index
Revision ID: cd2052be6db2
Revises: 7cb65b893934
Create Date: 2020-09-24 21:20:51.580500
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'cd2052be6db2'
down_revision = '7cb65b893934'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_convert_transfer',
sa.Column('id', sa.Integer, primary_key=True),
#sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('transfer_tx_hash', sa.String(66), unique=True),
# sa.Column('holder_address', sa.String(42), nullable=False),
sa.Column('recipient_address', sa.String(42), nullable=False),
)
op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address'])
def downgrade():
op.drop_index('idx_tx_convert_address')
op.drop_table('tx_convert_transfer')

View File

@@ -0,0 +1,31 @@
"""Add tx tracker record
Revision ID: df19f4e69676
Revises: 71708e943dbd
Create Date: 2020-10-09 23:31:44.563498
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'df19f4e69676'
down_revision = '71708e943dbd'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx',
# sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('date_added', sa.DateTime, nullable=False),
# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True),
# sa.Column('success', sa.Boolean(), nullable=False),
# )
pass
def downgrade():
# op.drop_table('tx')
pass

View File

@@ -0,0 +1,37 @@
"""Add cached values for tx
Revision ID: e3b5330ee71c
Revises: df19f4e69676
Create Date: 2020-10-10 00:17:07.094893
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e3b5330ee71c'
down_revision = 'df19f4e69676'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
sa.Column('source_token_address', sa.String(42), nullable=False),
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)
def downgrade():
op.drop_table('tx_cache')
pass

View File

@@ -1,13 +1,12 @@
"""Add chain syncer
Revision ID: b125cbf81e32
Revises: 0ec0d6d1e785
Create Date: 2021-04-02 18:36:44.459603
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
@@ -15,15 +14,15 @@ from chainsyncer.db.migrations.sqlalchemy import (
# revision identifiers, used by Alembic.
revision = 'b125cbf81e32'
down_revision = '0ec0d6d1e785'
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

Some files were not shown because too many files have changed in this diff Show More