Compare commits

..

16 Commits

16 changed files with 32 additions and 176 deletions

View File

@@ -100,4 +100,3 @@ class SessionBase(Model):
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -1,4 +1,4 @@
cic-base~=0.2.0a4
cic-base~=0.2.0a2
alembic==1.4.2
confini>=0.3.6rc3,<0.5.0
uwsgi==2.0.19.1
@@ -9,5 +9,5 @@ semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.3a5
chainsyncer[sql]~=0.0.3a4
erc20-faucet~=0.2.2a2

View File

@@ -126,4 +126,3 @@ class SessionBase(Model):
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -14,11 +14,9 @@ from chainlib.eth.tx import (
)
from chainlib.eth.block import block_by_number
from chainlib.eth.contract import abi_decode_single
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import UnknownContractError
from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusEnum
from chainqueue.sql.query import get_tx_cache
@@ -116,6 +114,9 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
# TODO: pass through registry to validate declarator entry of token
#token = registry.by_address(tx['to'], sender_address=self.call_address)
token = ERC20Token(chain_spec, rpc, tx['to'])
token_symbol = token.symbol
token_decimals = token.decimals
times = tx_times(tx['hash'], chain_spec)
tx_r = {
'hash': tx['hash'],
@@ -125,6 +126,12 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
'destination_value': tx_token_value,
'source_token': tx['to'],
'destination_token': tx['to'],
'source_token_symbol': token_symbol,
'destination_token_symbol': token_symbol,
'source_token_decimals': token_decimals,
'destination_token_decimals': token_decimals,
'source_token_chain': chain_str,
'destination_token_chain': chain_str,
'nonce': tx['nonce'],
}
if times['queue'] != None:
@@ -139,8 +146,8 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
# TODO: Surely it must be possible to optimize this
# TODO: DRY this with callback filter in cic_eth/runnable/manager
# TODO: Remove redundant fields from end representation (timestamp, tx_hash)
@celery_app.task(bind=True, base=BaseTask)
def tx_collate(self, tx_batches, chain_spec_dict, offset, limit, newest_first=True, verify_contracts=True):
@celery_app.task()
def tx_collate(tx_batches, chain_spec_dict, offset, limit, newest_first=True):
"""Merges transaction data from multiple sources and sorts them in chronological order.
:param tx_batches: Transaction data inputs
@@ -189,32 +196,6 @@ def tx_collate(self, tx_batches, chain_spec_dict, offset, limit, newest_first=Tr
if newest_first:
ks.reverse()
for k in ks:
tx = txs_by_block[k]
if verify_contracts:
try:
tx = verify_and_expand(tx, chain_spec, sender_address=BaseTask.call_address)
except UnknownContractError:
logg.error('verify failed on tx {}, skipping'.format(tx['hash']))
continue
txs.append(tx)
txs.append(txs_by_block[k])
return txs
def verify_and_expand(tx, chain_spec, sender_address=ZERO_ADDRESS):
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
if tx.get('source_token_symbol') == None and tx['source_token'] != ZERO_ADDRESS:
r = registry.by_address(tx['source_token'], sender_address=sender_address)
token = ERC20Token(chain_spec, rpc, tx['source_token'])
tx['source_token_symbol'] = token.symbol
tx['source_token_decimals'] = token.decimals
if tx.get('destination_token_symbol') == None and tx['destination_token'] != ZERO_ADDRESS:
r = registry.by_address(tx['destination_token'], sender_address=sender_address)
token = ERC20Token(chain_spec, rpc, tx['destination_token'])
tx['destination_token_symbol'] = token.symbol
tx['destination_token_decimals'] = token.decimals
return tx

View File

@@ -27,7 +27,7 @@ def database_engine(
SessionBase.poolable = False
dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
return dsn

View File

@@ -100,7 +100,6 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
q_outer = q_outer.join(Lock, isouter=True)
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid non-final tx value: {}'.format(status))

View File

@@ -90,7 +90,6 @@ class DispatchSyncer:
def __init__(self, chain_spec):
self.chain_spec = chain_spec
self.session = None
def chain(self):
@@ -101,18 +100,16 @@ class DispatchSyncer:
c = len(txs.keys())
logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
chain_str = str(self.chain_spec)
self.session = SessionBase.create_session()
session = SessionBase.create_session()
for k in txs.keys():
tx_raw = txs[k]
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec)
try:
set_reserved(self.chain_spec, tx['hash'], session=self.session)
self.session.commit()
set_reserved(self.chain_spec, tx['hash'], session=session)
except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
self.session.rollback()
continue
s_check = celery.signature(
@@ -135,25 +132,16 @@ class DispatchSyncer:
s_check.link(s_send)
t = s_check.apply_async()
logg.info('processed {}'.format(k))
self.session.close()
self.session = None
def loop(self, interval):
def loop(self, w3, interval):
while run:
txs = {}
typ = StatusBits.QUEUED
utxs = get_upcoming_tx(self.chain_spec, typ)
for k in utxs.keys():
txs[k] = utxs[k]
try:
conn = RPCConnection.connect(self.chain_spec, 'default')
self.process(conn, txs)
except ConnectionError as e:
if self.session != None:
self.session.close()
self.session = None
logg.error('connection to node failed: {}'.format(e))
self.process(w3, txs)
if len(utxs) > 0:
time.sleep(self.yield_delay)
@@ -163,7 +151,8 @@ class DispatchSyncer:
def main():
syncer = DispatchSyncer(chain_spec)
syncer.loop(float(config.get('DISPATCHER_LOOP_INTERVAL')))
conn = RPCConnection.connect(chain_spec, 'default')
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
sys.exit(0)

View File

@@ -9,8 +9,8 @@ import semver
version = (
0,
12,
1,
'alpha.2',
0,
'alpha.3',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=0
DEBUG=

View File

@@ -1,5 +1,5 @@
chainsyncer[sql]~=0.0.3a5
chainqueue~=0.0.2b7
chainsyncer[sql]~=0.0.3a4
chainqueue~=0.0.2b6
alembic==1.4.2
confini>=0.3.6rc4,<0.5.0
redis==3.5.3

View File

@@ -1,92 +0,0 @@
# external imports
import celery
import pytest
from chainlib.connection import RPCConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.gas import (
RPCGasOracle,
)
from chainlib.eth.tx import (
TxFormat,
unpack,
)
from chainlib.eth.nonce import RPCNonceOracle
from eth_erc20 import ERC20
from hexathon import (
add_0x,
strip_0x,
)
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
def test_ext_tx_collate(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
custodial_roles,
agent_roles,
foo_token,
bar_token,
register_tokens,
cic_registry,
register_lookups,
init_celery_tasks,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(custodial_roles['FOO_TOKEN_GIFTER'], eth_rpc)
gas_oracle = RPCGasOracle(eth_rpc)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
transfer_value_foo = 1000
transfer_value_bar = 1024
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(foo_token, custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], transfer_value_foo, tx_format=TxFormat.RLP_SIGNED)
tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec)
otx = Otx(
tx['nonce'],
tx_hash_hex,
tx_signed_raw_hex,
)
init_database.add(otx)
init_database.commit()
txc = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
foo_token,
bar_token,
transfer_value_foo,
transfer_value_bar,
666,
13,
session=init_database,
)
init_database.add(txc)
init_database.commit()
s = celery.signature(
'cic_eth.ext.tx.tx_collate',
[
{tx_hash_hex: tx_signed_raw_hex},
default_chain_spec.asdict(),
0,
100,
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert len(r) == 1
tx = r[0]
assert tx['source_token_symbol'] == 'FOO'
assert tx['source_token_decimals'] == 6
assert tx['destination_token_symbol'] == 'BAR'
assert tx['destination_token_decimals'] == 9

View File

@@ -1,7 +1,6 @@
# extended imports
# third-party imports
import pytest
import uuid
import unittest
# local imports
from cic_eth.db.models.nonce import (
@@ -56,7 +55,7 @@ def test_nonce_reserve(
o = q.first()
assert o.nonce == 43
nonce = NonceReservation.release(eth_empty_accounts[0], str(uu), session=init_database)
nonce = NonceReservation.release(eth_empty_accounts[0], str(uu))
init_database.commit()
assert nonce == (str(uu), 42)

View File

@@ -1,5 +1,4 @@
# external imports
import celery
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import (
receipt,
@@ -21,7 +20,6 @@ def test_translate(
cic_registry,
init_celery_tasks,
register_lookups,
celery_session_worker,
):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
@@ -48,20 +46,6 @@ def test_translate(
'recipient': agent_roles['BOB'],
'recipient_label': None,
}
#tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict())
s = celery.signature(
'cic_eth.ext.address.translate_tx_addresses',
[
tx,
[contract_roles['CONTRACT_DEPLOYER']],
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert r['sender_label'] == 'alice'
assert r['recipient_label'] == 'bob'
tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict())
assert tx['sender_label'] == 'alice'
assert tx['recipient_label'] == 'bob'

View File

@@ -127,4 +127,3 @@ class SessionBase(Model):
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -148,7 +148,6 @@ else
fi
mkdir -p $CIC_DATA_DIR
>&2 echo using data dir $CIC_DATA_DIR for environment variable dump
# this is consumed in downstream services to set environment variables
cat << EOF > $CIC_DATA_DIR/.env

View File

@@ -206,7 +206,7 @@ def gen():
# fake.local_latitude()
p.location['latitude'] = (random.random() * 180) - 90
# fake.local_latitude()
p.location['longitude'] = (random.random() * 360) - 180
p.location['longitude'] = (random.random() * 360) - 179
return (old_blockchain_checksum_address, phone, p)