Merge branch 'lash/cic-eth-cic-cache-txs' into 'master'
cic-eth: Integrate transaction list queries See merge request grassrootseconomics/cic-internal-integration!30
This commit is contained in:
commit
c6bcda8832
@ -267,7 +267,9 @@ class AdminApi:
|
|||||||
"""
|
"""
|
||||||
s = celery.signature(
|
s = celery.signature(
|
||||||
'cic_eth.queue.tx.get_account_tx',
|
'cic_eth.queue.tx.get_account_tx',
|
||||||
[address],
|
[
|
||||||
|
address,
|
||||||
|
],
|
||||||
queue=self.queue,
|
queue=self.queue,
|
||||||
)
|
)
|
||||||
txs = s.apply_async().get()
|
txs = s.apply_async().get()
|
||||||
|
@ -417,6 +417,73 @@ class Api:
|
|||||||
return t
|
return t
|
||||||
|
|
||||||
|
|
||||||
|
def list(self, address, limit=10, external_task=None, external_queue=None):
|
||||||
|
"""Retrieve an aggregate list of latest transactions of internal and (optionally) external origin in reverse chronological order.
|
||||||
|
|
||||||
|
The array of transactions returned have the same dict layout as those passed by the callback filter in cic_eth/runnable/manager
|
||||||
|
|
||||||
|
If the external task is defined, this task will be used to query external transactions. If this is not defined, no external transactions will be included. The task must accept (offset, limit, address) as input parameters, and return a bloom filter that will be used to retrieve transaction data for the matching transactions. See cic_eth.ext.tx.list_tx_by_bloom for details on the bloom filter dat format.
|
||||||
|
|
||||||
|
:param address: Ethereum address to list transactions for
|
||||||
|
:type address: str, 0x-hex
|
||||||
|
:param limit: Amount of results to return
|
||||||
|
:type limit: number
|
||||||
|
:param external_task: Celery task providing external transactions
|
||||||
|
:type external_task: str
|
||||||
|
:param external_queue: Celery task queue providing exernal transactions task
|
||||||
|
:type external_queue: str
|
||||||
|
:returns: List of transactions
|
||||||
|
:rtype: list of dict
|
||||||
|
"""
|
||||||
|
offset = 0
|
||||||
|
s_local = celery.signature(
|
||||||
|
'cic_eth.queue.tx.get_account_tx',
|
||||||
|
[
|
||||||
|
address,
|
||||||
|
],
|
||||||
|
queue=self.queue,
|
||||||
|
)
|
||||||
|
|
||||||
|
s_brief = celery.signature(
|
||||||
|
'cic_eth.ext.tx.tx_collate',
|
||||||
|
[
|
||||||
|
self.chain_str,
|
||||||
|
offset,
|
||||||
|
limit
|
||||||
|
],
|
||||||
|
queue=self.queue,
|
||||||
|
)
|
||||||
|
if self.callback_param != None:
|
||||||
|
s_assemble.link(self.callback_success).on_error(self.callback_error)
|
||||||
|
|
||||||
|
t = None
|
||||||
|
if external_task != None:
|
||||||
|
s_external_get = celery.signature(
|
||||||
|
external_task,
|
||||||
|
[
|
||||||
|
address,
|
||||||
|
offset,
|
||||||
|
limit,
|
||||||
|
],
|
||||||
|
queue=external_queue,
|
||||||
|
)
|
||||||
|
|
||||||
|
s_external_process = celery.signature(
|
||||||
|
'cic_eth.ext.tx.list_tx_by_bloom',
|
||||||
|
[
|
||||||
|
address,
|
||||||
|
self.chain_str,
|
||||||
|
],
|
||||||
|
queue=self.queue,
|
||||||
|
)
|
||||||
|
c = celery.chain(s_external_get, s_external_process)
|
||||||
|
t = celery.chord([s_local, c])(s_brief)
|
||||||
|
else:
|
||||||
|
t = s_local.apply_sync()
|
||||||
|
|
||||||
|
return t
|
||||||
|
|
||||||
|
|
||||||
def ping(self, r):
|
def ping(self, r):
|
||||||
"""A noop callback ping for testing purposes.
|
"""A noop callback ping for testing purposes.
|
||||||
|
|
||||||
|
@ -422,7 +422,7 @@ class Otx(SessionBase):
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get(status=0, limit=4096, status_exact=True):
|
def get(status=0, limit=4096, status_exact=True, session=None):
|
||||||
"""Returns outgoing transaction lists by status.
|
"""Returns outgoing transaction lists by status.
|
||||||
|
|
||||||
Status may either be matched exactly, or be an upper bound of the integer value of the status enum.
|
Status may either be matched exactly, or be an upper bound of the integer value of the status enum.
|
||||||
@ -437,26 +437,32 @@ class Otx(SessionBase):
|
|||||||
:rtype: tuple, where first element is transaction hash
|
:rtype: tuple, where first element is transaction hash
|
||||||
"""
|
"""
|
||||||
e = None
|
e = None
|
||||||
session = Otx.create_session()
|
|
||||||
|
session = SessionBase.bind_session(session)
|
||||||
|
|
||||||
if status_exact:
|
if status_exact:
|
||||||
e = session.query(Otx.tx_hash).filter(Otx.status==status).order_by(Otx.date_created.asc()).limit(limit).all()
|
e = session.query(Otx.tx_hash).filter(Otx.status==status).order_by(Otx.date_created.asc()).limit(limit).all()
|
||||||
else:
|
else:
|
||||||
e = session.query(Otx.tx_hash).filter(Otx.status<=status).order_by(Otx.date_created.asc()).limit(limit).all()
|
e = session.query(Otx.tx_hash).filter(Otx.status<=status).order_by(Otx.date_created.asc()).limit(limit).all()
|
||||||
session.close()
|
|
||||||
|
SessionBase.release_session(session)
|
||||||
return e
|
return e
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load(tx_hash):
|
def load(tx_hash, session=None):
|
||||||
"""Retrieves the outgoing transaction record by transaction hash.
|
"""Retrieves the outgoing transaction record by transaction hash.
|
||||||
|
|
||||||
:param tx_hash: Transaction hash
|
:param tx_hash: Transaction hash
|
||||||
:type tx_hash: str, 0x-hex
|
:type tx_hash: str, 0x-hex
|
||||||
"""
|
"""
|
||||||
session = Otx.create_session()
|
session = SessionBase.bind_session(session)
|
||||||
|
|
||||||
q = session.query(Otx)
|
q = session.query(Otx)
|
||||||
q = q.filter(Otx.tx_hash==tx_hash)
|
q = q.filter(Otx.tx_hash==tx_hash)
|
||||||
session.close()
|
|
||||||
|
SessionBase.release_session(session)
|
||||||
|
|
||||||
return q.first()
|
return q.first()
|
||||||
|
|
||||||
|
|
||||||
|
176
apps/cic-eth/cic_eth/ext/tx.py
Normal file
176
apps/cic-eth/cic_eth/ext/tx.py
Normal file
@ -0,0 +1,176 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
import math
|
||||||
|
|
||||||
|
# third-pary imports
|
||||||
|
import web3
|
||||||
|
import celery
|
||||||
|
import moolb
|
||||||
|
from cic_registry.chain import ChainSpec
|
||||||
|
from cic_registry.registry import CICRegistry
|
||||||
|
from hexathon import strip_0x
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_eth.eth.rpc import RpcClient
|
||||||
|
from cic_eth.db.models.otx import Otx
|
||||||
|
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||||
|
from cic_eth.db.enum import StatusEnum
|
||||||
|
from cic_eth.eth.token import unpack_transfer
|
||||||
|
from cic_eth.queue.tx import get_tx_cache
|
||||||
|
from cic_eth.queue.time import tx_times
|
||||||
|
|
||||||
|
celery_app = celery.current_app
|
||||||
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
MAX_BLOCK_TX = 250
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Make this method easier to read
|
||||||
|
@celery_app.task()
|
||||||
|
def list_tx_by_bloom(bloomspec, address, chain_str):
|
||||||
|
"""Retrieve external transaction data matching the provided filter
|
||||||
|
|
||||||
|
The bloom filter representation with the following structure (the size of the filter will be inferred from the size of the provided filter data):
|
||||||
|
{
|
||||||
|
'alg': <str; hashing algorithm, currently only "sha256" is understood>,
|
||||||
|
'high': <number; highest block number in matched set>,
|
||||||
|
'low': <number; lowest block number in matched set>,
|
||||||
|
'filter_rounds': <number; hashing rounds used to generate filter entry>,
|
||||||
|
'block_filter': <hex; bloom filter data with block matches>,
|
||||||
|
'blocktx_filter': <hex; bloom filter data with block+tx matches>,
|
||||||
|
}
|
||||||
|
|
||||||
|
:param bloomspec: Bloom filter data
|
||||||
|
:type bloomspec: dict (see description above)
|
||||||
|
:param address: Recipient address to use in matching
|
||||||
|
:type address: str, 0x-hex
|
||||||
|
:param chain_str: Chain spec string representation
|
||||||
|
:type chain_str: str
|
||||||
|
:returns: dict of transaction data as dict, keyed by transaction hash
|
||||||
|
:rtype: dict of dict
|
||||||
|
"""
|
||||||
|
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||||
|
c = RpcClient(chain_spec)
|
||||||
|
block_filter_data = bytes.fromhex(bloomspec['block_filter'])
|
||||||
|
tx_filter_data = bytes.fromhex(bloomspec['blocktx_filter'])
|
||||||
|
databitlen = len(block_filter_data)*8
|
||||||
|
block_filter = moolb.Bloom(databitlen, bloomspec['filter_rounds'], default_data=block_filter_data)
|
||||||
|
tx_filter = moolb.Bloom(databitlen, bloomspec['filter_rounds'], default_data=tx_filter_data)
|
||||||
|
|
||||||
|
txs = {}
|
||||||
|
for block_height in range(bloomspec['low'], bloomspec['high']):
|
||||||
|
block_height_bytes = block_height.to_bytes(4, 'big')
|
||||||
|
if block_filter.check(block_height_bytes):
|
||||||
|
logg.debug('filter matched block {}'.format(block_height))
|
||||||
|
block = c.w3.eth.getBlock(block_height, True)
|
||||||
|
|
||||||
|
for tx_index in range(0, len(block.transactions)):
|
||||||
|
composite = tx_index + block_height
|
||||||
|
tx_index_bytes = composite.to_bytes(4, 'big')
|
||||||
|
if tx_filter.check(tx_index_bytes):
|
||||||
|
logg.debug('filter matched block {} tx {}'.format(block_height, tx_index))
|
||||||
|
|
||||||
|
try:
|
||||||
|
tx = c.w3.eth.getTransactionByBlock(block_height, tx_index)
|
||||||
|
except web3.exceptions.TransactionNotFound:
|
||||||
|
logg.debug('false positive on block {} tx {}'.format(block_height, tx_index))
|
||||||
|
continue
|
||||||
|
tx_address = None
|
||||||
|
tx_token_value = 0
|
||||||
|
try:
|
||||||
|
transfer_data = unpack_transfer(tx['data'])
|
||||||
|
tx_address = transfer_data['to']
|
||||||
|
tx_token_value = transfer_data['amount']
|
||||||
|
except ValueError:
|
||||||
|
logg.debug('not a transfer transaction, skipping {}'.format(tx))
|
||||||
|
continue
|
||||||
|
if address == tx_address:
|
||||||
|
status = StatusEnum.SENT
|
||||||
|
try:
|
||||||
|
rcpt = c.w3.eth.getTransactionReceipt(tx.hash)
|
||||||
|
if rcpt['status'] == 0:
|
||||||
|
pending = StatusEnum.REVERTED
|
||||||
|
else:
|
||||||
|
pending = StatusEnum.SUCCESS
|
||||||
|
except web3.exceptions.TransactionNotFound:
|
||||||
|
pass
|
||||||
|
|
||||||
|
tx_hash_hex = tx['hash'].hex()
|
||||||
|
|
||||||
|
token = CICRegistry.get_address(chain_spec, tx['to'])
|
||||||
|
token_symbol = token.symbol()
|
||||||
|
token_decimals = token.decimals()
|
||||||
|
times = tx_times(tx_hash_hex, chain_str)
|
||||||
|
tx_r = {
|
||||||
|
'hash': tx_hash_hex,
|
||||||
|
'sender': tx['from'],
|
||||||
|
'recipient': tx_address,
|
||||||
|
'source_value': tx_token_value,
|
||||||
|
'destination_value': tx_token_value,
|
||||||
|
'source_token': tx['to'],
|
||||||
|
'destination_token': tx['to'],
|
||||||
|
'source_token_symbol': token_symbol,
|
||||||
|
'destination_token_symbol': token_symbol,
|
||||||
|
'source_token_decimals': token_decimals,
|
||||||
|
'destination_token_decimals': token_decimals,
|
||||||
|
'source_token_chain': chain_str,
|
||||||
|
'destination_token_chain': chain_str,
|
||||||
|
'nonce': tx['nonce'],
|
||||||
|
}
|
||||||
|
if times['queue'] != None:
|
||||||
|
tx_r['date_created'] = times['queue']
|
||||||
|
else:
|
||||||
|
tx_r['date_created'] = times['network']
|
||||||
|
txs[tx_hash_hex] = tx_r
|
||||||
|
break
|
||||||
|
return txs
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Surely it must be possible to optimize this
|
||||||
|
# TODO: DRY this with callback filter in cic_eth/runnable/manager
|
||||||
|
# TODO: Remove redundant fields from end representation (timestamp, tx_hash)
|
||||||
|
@celery_app.task()
|
||||||
|
def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
|
||||||
|
"""Merges transaction data from multiple sources and sorts them in chronological order.
|
||||||
|
|
||||||
|
:param tx_batches: Transaction data inputs
|
||||||
|
:type tx_batches: lists of lists of transaction data
|
||||||
|
:param chain_str: Chain spec string representation
|
||||||
|
:type chain_str: str
|
||||||
|
:param offset: Number of sorted results to skip (not yet implemented)
|
||||||
|
:type offset: number
|
||||||
|
:param limit: Maximum number of results to return (not yet implemented)
|
||||||
|
:type limit: number
|
||||||
|
:param newest_first: If True, returns results in reverse chronological order
|
||||||
|
:type newest_first: bool
|
||||||
|
:returns: Transactions
|
||||||
|
:rtype: list
|
||||||
|
"""
|
||||||
|
txs_by_block = {}
|
||||||
|
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||||
|
|
||||||
|
for b in tx_batches:
|
||||||
|
for v in b.values():
|
||||||
|
tx = None
|
||||||
|
k = None
|
||||||
|
try:
|
||||||
|
hx = strip_0x(v)
|
||||||
|
tx = unpack_signed_raw_tx(bytes.fromhex(hx), chain_spec.chain_id())
|
||||||
|
txc = get_tx_cache(tx['hash'])
|
||||||
|
txc['timestamp'] = int(txc['date_created'].timestamp())
|
||||||
|
txc['hash'] = txc['tx_hash']
|
||||||
|
tx = txc
|
||||||
|
except TypeError:
|
||||||
|
tx = v
|
||||||
|
tx['timestamp'] = tx['date_created']
|
||||||
|
k = '{}.{}.{}'.format(tx['timestamp'], tx['sender'], tx['nonce'])
|
||||||
|
txs_by_block[k] = tx
|
||||||
|
|
||||||
|
txs = []
|
||||||
|
ks = list(txs_by_block.keys())
|
||||||
|
ks.sort()
|
||||||
|
if newest_first:
|
||||||
|
ks.reverse()
|
||||||
|
for k in ks:
|
||||||
|
txs.append(txs_by_block[k])
|
||||||
|
return txs
|
40
apps/cic-eth/cic_eth/queue/time.py
Normal file
40
apps/cic-eth/cic_eth/queue/time.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# third-party imports
|
||||||
|
import web3
|
||||||
|
import celery
|
||||||
|
from cic_registry.chain import ChainSpec
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_eth.eth.rpc import RpcClient
|
||||||
|
from cic_eth.db.models.otx import Otx
|
||||||
|
from cic_eth.error import NotLocalTxError
|
||||||
|
|
||||||
|
celery_app = celery.current_app
|
||||||
|
|
||||||
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: This method does not belong in the _queue_ module, it operates across queue and network
|
||||||
|
@celery_app.task()
|
||||||
|
def tx_times(tx_hash, chain_str):
|
||||||
|
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||||
|
c = RpcClient(chain_spec)
|
||||||
|
time_pair = {
|
||||||
|
'network': None,
|
||||||
|
'queue': None,
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
rcpt = c.w3.eth.getTransactionReceipt(tx_hash)
|
||||||
|
block = c.w3.eth.getBlock(rcpt['blockHash'])
|
||||||
|
logg.debug('rcpt {}'.format(block))
|
||||||
|
time_pair['network'] = block['timestamp']
|
||||||
|
except web3.exceptions.TransactionNotFound:
|
||||||
|
pass
|
||||||
|
|
||||||
|
otx = Otx.load(tx_hash)
|
||||||
|
if otx != None:
|
||||||
|
time_pair['queue'] = int(otx['date_created'].timestamp())
|
||||||
|
|
||||||
|
return time_pair
|
@ -386,6 +386,8 @@ def get_tx_cache(tx_hash):
|
|||||||
'status_code': otx.status,
|
'status_code': otx.status,
|
||||||
'source_token': txc.source_token_address,
|
'source_token': txc.source_token_address,
|
||||||
'destination_token': txc.destination_token_address,
|
'destination_token': txc.destination_token_address,
|
||||||
|
'block_number': txc.block_number,
|
||||||
|
'tx_index': txc.tx_index,
|
||||||
'sender': txc.sender,
|
'sender': txc.sender,
|
||||||
'recipient': txc.recipient,
|
'recipient': txc.recipient,
|
||||||
'from_value': int(txc.from_value),
|
'from_value': int(txc.from_value),
|
||||||
@ -661,6 +663,7 @@ def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None)
|
|||||||
"""
|
"""
|
||||||
if not as_sender and not as_recipient:
|
if not as_sender and not as_recipient:
|
||||||
raise ValueError('at least one of as_sender and as_recipient must be True')
|
raise ValueError('at least one of as_sender and as_recipient must be True')
|
||||||
|
|
||||||
txs = {}
|
txs = {}
|
||||||
|
|
||||||
session = SessionBase.create_session()
|
session = SessionBase.create_session()
|
||||||
@ -676,6 +679,9 @@ def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None)
|
|||||||
|
|
||||||
results = q.all()
|
results = q.all()
|
||||||
for r in results:
|
for r in results:
|
||||||
|
if txs.get(r.tx_hash) != None:
|
||||||
|
logg.debug('tx {} already recorded'.format(r.tx_hash))
|
||||||
|
continue
|
||||||
txs[r.tx_hash] = r.signed_tx
|
txs[r.tx_hash] = r.signed_tx
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
|
@ -16,5 +16,7 @@ uWSGI==2.0.19.1
|
|||||||
semver==2.13.0
|
semver==2.13.0
|
||||||
eth-gas-proxy==0.0.1a4
|
eth-gas-proxy==0.0.1a4
|
||||||
websocket-client==0.57.0
|
websocket-client==0.57.0
|
||||||
|
moolb~=0.1.1b2
|
||||||
eth-address-index~=0.1.0a8
|
eth-address-index~=0.1.0a8
|
||||||
chainlib~=0.0.1a11
|
chainlib~=0.0.1a12
|
||||||
|
hexathon~=0.0.1a3
|
||||||
|
@ -15,12 +15,14 @@ def celery_includes():
|
|||||||
'cic_eth.eth.token',
|
'cic_eth.eth.token',
|
||||||
'cic_eth.eth.request',
|
'cic_eth.eth.request',
|
||||||
'cic_eth.eth.tx',
|
'cic_eth.eth.tx',
|
||||||
|
'cic_eth.ext.tx',
|
||||||
'cic_eth.queue.tx',
|
'cic_eth.queue.tx',
|
||||||
'cic_eth.admin.ctrl',
|
'cic_eth.admin.ctrl',
|
||||||
'cic_eth.admin.nonce',
|
'cic_eth.admin.nonce',
|
||||||
'cic_eth.eth.account',
|
'cic_eth.eth.account',
|
||||||
'cic_eth.callbacks.noop',
|
'cic_eth.callbacks.noop',
|
||||||
'cic_eth.callbacks.http',
|
'cic_eth.callbacks.http',
|
||||||
|
'tests.mock.filter',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
92
apps/cic-eth/tests/functional/test_list.py
Normal file
92
apps/cic-eth/tests/functional/test_list.py
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_eth.api.api_task import Api
|
||||||
|
from cic_eth.eth.token import TokenTxFactory
|
||||||
|
from cic_eth.eth.task import sign_tx
|
||||||
|
from tests.mock.filter import (
|
||||||
|
block_filter,
|
||||||
|
tx_filter,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_tx(
|
||||||
|
default_chain_spec,
|
||||||
|
default_chain_registry,
|
||||||
|
init_database,
|
||||||
|
init_rpc,
|
||||||
|
init_w3,
|
||||||
|
init_eth_tester,
|
||||||
|
dummy_token_gifted,
|
||||||
|
cic_registry,
|
||||||
|
celery_session_worker,
|
||||||
|
):
|
||||||
|
|
||||||
|
tx_hashes = []
|
||||||
|
# external tx
|
||||||
|
init_eth_tester.mine_blocks(13)
|
||||||
|
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||||
|
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
|
||||||
|
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
|
||||||
|
tx_hashes.append(tx_hash_hex)
|
||||||
|
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
|
||||||
|
# add to filter
|
||||||
|
rcpt = init_w3.eth.getTransactionReceipt(tx_hash_hex)
|
||||||
|
a = rcpt['blockNumber']
|
||||||
|
block_filter.add(a.to_bytes(4, 'big'))
|
||||||
|
a = rcpt['blockNumber'] + rcpt['transactionIndex']
|
||||||
|
tx_filter.add(a.to_bytes(4, 'big'))
|
||||||
|
|
||||||
|
# external tx
|
||||||
|
init_eth_tester.mine_blocks(28)
|
||||||
|
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||||
|
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
|
||||||
|
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
|
||||||
|
tx_hashes.append(tx_hash_hex)
|
||||||
|
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
|
||||||
|
# add to filter
|
||||||
|
rcpt = init_w3.eth.getTransactionReceipt(tx_hash_hex)
|
||||||
|
a = rcpt['blockNumber']
|
||||||
|
block_filter.add(a.to_bytes(4, 'big'))
|
||||||
|
a = rcpt['blockNumber'] + rcpt['transactionIndex']
|
||||||
|
tx_filter.add(a.to_bytes(4, 'big'))
|
||||||
|
|
||||||
|
# custodial tx
|
||||||
|
init_eth_tester.mine_blocks(3)
|
||||||
|
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||||
|
api = Api(str(default_chain_spec), queue=None)
|
||||||
|
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM')
|
||||||
|
t.get()
|
||||||
|
tx_hash_hex = None
|
||||||
|
for c in t.collect():
|
||||||
|
tx_hash_hex = c[1]
|
||||||
|
assert t.successful()
|
||||||
|
tx_hashes.append(tx_hash_hex)
|
||||||
|
|
||||||
|
# custodial tx
|
||||||
|
init_eth_tester.mine_blocks(6)
|
||||||
|
api = Api(str(default_chain_spec), queue=None)
|
||||||
|
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM')
|
||||||
|
t.get()
|
||||||
|
tx_hash_hex = None
|
||||||
|
for c in t.collect():
|
||||||
|
tx_hash_hex = c[1]
|
||||||
|
assert t.successful()
|
||||||
|
tx_hashes.append(tx_hash_hex)
|
||||||
|
|
||||||
|
# test the api
|
||||||
|
t = api.list(init_w3.eth.accounts[1], external_task='tests.mock.filter.filter')
|
||||||
|
r = t.get()
|
||||||
|
for c in t.collect():
|
||||||
|
r = c[1]
|
||||||
|
assert t.successful()
|
||||||
|
|
||||||
|
assert len(r) == 4
|
||||||
|
for tx in r:
|
||||||
|
logg.debug('have tx {}'.format(r))
|
||||||
|
tx_hashes.remove(tx['hash'])
|
||||||
|
assert len(tx_hashes) == 0
|
1
apps/cic-eth/tests/mock/__init__.py
Normal file
1
apps/cic-eth/tests/mock/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
from .filter import *
|
22
apps/cic-eth/tests/mock/filter.py
Normal file
22
apps/cic-eth/tests/mock/filter.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
# third-party imports
|
||||||
|
import celery
|
||||||
|
import moolb
|
||||||
|
|
||||||
|
celery_app = celery.current_app
|
||||||
|
|
||||||
|
block_filter = moolb.Bloom(1024, 3)
|
||||||
|
tx_filter = moolb.Bloom(1024, 3)
|
||||||
|
lo = 0
|
||||||
|
hi = 100
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task()
|
||||||
|
def filter(address, offset, limit):
|
||||||
|
return {
|
||||||
|
'alg': 'sha256',
|
||||||
|
'high': hi,
|
||||||
|
'low': lo,
|
||||||
|
'block_filter': block_filter.to_bytes().hex(),
|
||||||
|
'blocktx_filter': tx_filter.to_bytes().hex(),
|
||||||
|
'filter_rounds': 3,
|
||||||
|
}
|
109
apps/cic-eth/tests/unit/ext/test_ext_tx.py
Normal file
109
apps/cic-eth/tests/unit/ext/test_ext_tx.py
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# third-party imports
|
||||||
|
import celery
|
||||||
|
import moolb
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_eth.eth.token import TokenTxFactory
|
||||||
|
from cic_eth.eth.task import sign_tx
|
||||||
|
|
||||||
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: This test fails when not run alone. Identify which fixture leaves a dirty state
|
||||||
|
def test_filter_process(
|
||||||
|
init_rpc,
|
||||||
|
default_chain_spec,
|
||||||
|
default_chain_registry,
|
||||||
|
celery_session_worker,
|
||||||
|
init_eth_tester,
|
||||||
|
init_w3,
|
||||||
|
dummy_token_gifted,
|
||||||
|
cic_registry,
|
||||||
|
):
|
||||||
|
|
||||||
|
b = moolb.Bloom(1024, 3)
|
||||||
|
t = moolb.Bloom(1024, 3)
|
||||||
|
|
||||||
|
tx_hashes = []
|
||||||
|
# external tx
|
||||||
|
init_eth_tester.mine_blocks(13)
|
||||||
|
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||||
|
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
|
||||||
|
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
|
||||||
|
tx_hashes.append(tx_hash_hex)
|
||||||
|
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
|
||||||
|
# add to filter
|
||||||
|
rcpt = init_w3.eth.getTransactionReceipt(tx_hash_hex)
|
||||||
|
a = rcpt['blockNumber']
|
||||||
|
b.add(a.to_bytes(4, 'big'))
|
||||||
|
a = rcpt['blockNumber'] + rcpt['transactionIndex']
|
||||||
|
t.add(a.to_bytes(4, 'big'))
|
||||||
|
|
||||||
|
# external tx
|
||||||
|
init_eth_tester.mine_blocks(28)
|
||||||
|
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||||
|
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
|
||||||
|
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
|
||||||
|
tx_hashes.append(tx_hash_hex)
|
||||||
|
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
|
||||||
|
# add to filter
|
||||||
|
rcpt = init_w3.eth.getTransactionReceipt(tx_hash_hex)
|
||||||
|
a = rcpt['blockNumber']
|
||||||
|
b.add(a.to_bytes(4, 'big'))
|
||||||
|
a = rcpt['blockNumber'] + rcpt['transactionIndex']
|
||||||
|
t.add(a.to_bytes(4, 'big'))
|
||||||
|
|
||||||
|
# init_eth_tester.mine_blocks(13)
|
||||||
|
# tx_hash_one = init_w3.eth.sendTransaction({
|
||||||
|
# 'from': init_w3.eth.accounts[2],
|
||||||
|
# 'to': init_w3.eth.accounts[1],
|
||||||
|
# 'value': 1024,
|
||||||
|
# })
|
||||||
|
# rcpt = init_w3.eth.getTransactionReceipt(tx_hash_one)
|
||||||
|
# a = rcpt['blockNumber']
|
||||||
|
# b.add(a.to_bytes(4, 'big'))
|
||||||
|
# a = rcpt['blockNumber'] + rcpt['transactionIndex']
|
||||||
|
# t.add(a.to_bytes(4, 'big'))
|
||||||
|
#
|
||||||
|
# init_eth_tester.mine_blocks(28)
|
||||||
|
# tx_hash_two = init_w3.eth.sendTransaction({
|
||||||
|
# 'from': init_w3.eth.accounts[3],
|
||||||
|
# 'to': init_w3.eth.accounts[1],
|
||||||
|
# 'value': 2048,
|
||||||
|
# })
|
||||||
|
# rcpt = init_w3.eth.getTransactionReceipt(tx_hash_two)
|
||||||
|
# a = rcpt['blockNumber']
|
||||||
|
# b.add(a.to_bytes(4, 'big'))
|
||||||
|
# a = rcpt['blockNumber'] + rcpt['transactionIndex']
|
||||||
|
# t.add(a.to_bytes(4, 'big'))
|
||||||
|
|
||||||
|
init_eth_tester.mine_blocks(10)
|
||||||
|
|
||||||
|
o = {
|
||||||
|
'alg': 'sha256',
|
||||||
|
'filter_rounds': 3,
|
||||||
|
'low': 0,
|
||||||
|
'high': 50,
|
||||||
|
'block_filter': b.to_bytes().hex(),
|
||||||
|
'blocktx_filter': t.to_bytes().hex(),
|
||||||
|
}
|
||||||
|
|
||||||
|
s = celery.signature(
|
||||||
|
'cic_eth.ext.tx.list_tx_by_bloom',
|
||||||
|
[
|
||||||
|
o,
|
||||||
|
init_w3.eth.accounts[1],
|
||||||
|
str(default_chain_spec),
|
||||||
|
],
|
||||||
|
queue=None
|
||||||
|
)
|
||||||
|
t = s.apply_async()
|
||||||
|
r = t.get()
|
||||||
|
|
||||||
|
assert len(r) == 2
|
||||||
|
for tx_hash in r.keys():
|
||||||
|
tx_hashes.remove(tx_hash)
|
||||||
|
assert len(tx_hashes) == 0
|
@ -41,3 +41,5 @@ yaml-acl==0.0.1
|
|||||||
rlp==2.0.1
|
rlp==2.0.1
|
||||||
cryptocurrency-cli-tools==0.0.4
|
cryptocurrency-cli-tools==0.0.4
|
||||||
giftable-erc20-token==0.0.7b7
|
giftable-erc20-token==0.0.7b7
|
||||||
|
hexathon==0.0.1a3
|
||||||
|
chainlib==0.0.1a12
|
||||||
|
@ -126,56 +126,71 @@ services:
|
|||||||
- contract-config:/tmp/cic/config
|
- contract-config:/tmp/cic/config
|
||||||
|
|
||||||
|
|
||||||
# cic-cache-tracker:
|
cic-cache-tracker:
|
||||||
# # image: registry.gitlab.com/grassrootseconomics/cic-cache:master-latest
|
build:
|
||||||
# build: apps/cic-cache
|
context: apps/cic-cache/
|
||||||
# environment:
|
dockerfile: docker/Dockerfile
|
||||||
# CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS # supplied at contract-config after contract provisioning
|
environment:
|
||||||
# ETH_PROVIDER: ${ETH_PROVIDER:-http://eth:8545}
|
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS # supplied at contract-config after contract provisioning
|
||||||
# BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}
|
ETH_PROVIDER: ${ETH_PROVIDER:-http://eth:8545}
|
||||||
# DATABASE_USER: ${DATABASE_USER:-grassroots}
|
DATABASE_USER: ${DATABASE_USER:-grassroots}
|
||||||
# DATABASE_PASSWORD: ${DATABASE_PASSWORD:-tralala} # this is is set at initdb see: postgres/initdb/create_db.sql
|
DATABASE_PASSWORD: ${DATABASE_PASSWORD:-tralala} # this is is set at initdb see: postgres/initdb/create_db.sql
|
||||||
# DATABASE_HOST: ${DATABASE_HOST:-postgres}
|
DATABASE_HOST: ${DATABASE_HOST:-postgres}
|
||||||
# DATABASE_PORT: ${DATABASE_PORT:-5432}
|
DATABASE_PORT: ${DATABASE_PORT:-5432}
|
||||||
# DATABASE_NAME: ${DATABASE_NAME_CIC_CACHE:-cic_cache}
|
DATABASE_NAME: ${DATABASE_NAME_CIC_CACHE:-cic_cache}
|
||||||
# DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
|
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
|
||||||
# DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
|
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
|
||||||
# ETH_ABI_DIR: ${ETH_ABI_DIR:-/usr/local/share/cic/solidity/abi}
|
DATABASE_DEBUG: 1
|
||||||
# deploy:
|
ETH_ABI_DIR: ${ETH_ABI_DIR:-/usr/local/share/cic/solidity/abi}
|
||||||
# restart_policy:
|
CIC_TRUST_ADDRESS: ${DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER:-0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C}
|
||||||
# condition: on-failure
|
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996}
|
||||||
# depends_on:
|
CELERY_BROKER_URL: redis://redis:6379
|
||||||
# - postgres
|
CELERY_RESULT_URL: redis://redis:6379
|
||||||
# - eth
|
deploy:
|
||||||
# command:
|
restart_policy:
|
||||||
# - /bin/sh
|
condition: on-failure
|
||||||
# - -c
|
depends_on:
|
||||||
# - |
|
- redis
|
||||||
# if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
|
- postgres
|
||||||
# /usr/local/bin/cic-cache-tracker -vv
|
- eth
|
||||||
# volumes:
|
command:
|
||||||
# - contract-config:/tmp/cic/config/:ro
|
- /bin/bash
|
||||||
# entrypoint: ["/usr/local/bin/cic-cache-tracker", "-vv"]
|
- -c
|
||||||
# command: "/usr/local/bin/cic-cache-tracker -vv"
|
- |
|
||||||
|
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
|
||||||
|
/usr/local/bin/cic-cache-tracker -vv
|
||||||
|
volumes:
|
||||||
|
- contract-config:/tmp/cic/config/:ro
|
||||||
|
|
||||||
# cic-cache-server:
|
cic-cache-server:
|
||||||
# image: grassrootseconomics:cic-cache-uwsgi
|
build:
|
||||||
# environment:
|
context: apps/cic-cache/
|
||||||
# DATABASE_USER: $DATABASE_USER
|
dockerfile: docker/Dockerfile
|
||||||
# DATABASE_HOST: $DATABASE_HOST
|
environment:
|
||||||
# DATABASE_PORT: $DATABASE_PORT
|
DATABASE_USER: $DATABASE_USER
|
||||||
# DATABASE_PASSWORD: $DATABASE_PASSWORD
|
DATABASE_HOST: $DATABASE_HOST
|
||||||
# DATABASE_NAME: $DATABASE_NAME_CIC_CACHE
|
DATABASE_PORT: $DATABASE_PORT
|
||||||
# PGPASSWORD: $DATABASE_PASSWORD
|
DATABASE_PASSWORD: $DATABASE_PASSWORD
|
||||||
# SERVER_PORT: 80
|
DATABASE_NAME: $DATABASE_NAME_CIC_CACHE
|
||||||
# ports:
|
DATABASE_DEBUG: 1
|
||||||
# - ${HTTP_PORT_CIC_CACHE}:80
|
PGPASSWORD: $DATABASE_PASSWORD
|
||||||
# depends_on:
|
SERVER_PORT: 8000
|
||||||
# - postgres
|
ports:
|
||||||
# deploy:
|
- ${HTTP_PORT_CIC_CACHE:-63313}:8000
|
||||||
# restart_policy:
|
depends_on:
|
||||||
# condition: on-failure
|
- postgres
|
||||||
# command: "/root/start_uwsgi.sh"
|
deploy:
|
||||||
|
restart_policy:
|
||||||
|
condition: on-failure
|
||||||
|
command:
|
||||||
|
- /bin/bash
|
||||||
|
- -c
|
||||||
|
- |
|
||||||
|
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
|
||||||
|
"/usr/local/bin/uwsgi" \
|
||||||
|
--wsgi-file /usr/src/cic-cache/cic_cache/runnable/server.py \
|
||||||
|
--http :80 \
|
||||||
|
--pyargv -vv
|
||||||
|
|
||||||
cic-eth-tasker:
|
cic-eth-tasker:
|
||||||
# image: grassrootseconomics:cic-eth-service
|
# image: grassrootseconomics:cic-eth-service
|
||||||
@ -370,42 +385,42 @@ services:
|
|||||||
# command: "/root/start_retry.sh -q cic-eth -vv"
|
# command: "/root/start_retry.sh -q cic-eth -vv"
|
||||||
|
|
||||||
|
|
||||||
cic-eth-server:
|
# cic-eth-server:
|
||||||
build:
|
# build:
|
||||||
context: apps/
|
# context: apps/
|
||||||
dockerfile: cic-eth/docker/Dockerfile
|
# dockerfile: cic-eth/docker/Dockerfile
|
||||||
environment:
|
# environment:
|
||||||
CIC_CHAIN_SPEC: $CIC_CHAIN_SPEC
|
# CIC_CHAIN_SPEC: $CIC_CHAIN_SPEC
|
||||||
CELERY_BROKER_URL: $CELERY_BROKER_URL
|
# CELERY_BROKER_URL: $CELERY_BROKER_URL
|
||||||
CELERY_RESULT_URL: $CELERY_RESULT_URL
|
# CELERY_RESULT_URL: $CELERY_RESULT_URL
|
||||||
SERVER_PORT: 8000
|
# SERVER_PORT: 8000
|
||||||
depends_on:
|
# depends_on:
|
||||||
- eth
|
# - eth
|
||||||
- postgres
|
# - postgres
|
||||||
- redis
|
# - redis
|
||||||
ports:
|
# ports:
|
||||||
- ${HTTP_PORT_CIC_ETH:-63314}:8000
|
# - ${HTTP_PORT_CIC_ETH:-63314}:8000
|
||||||
deploy:
|
# deploy:
|
||||||
restart_policy:
|
# restart_policy:
|
||||||
condition: on-failure
|
# condition: on-failure
|
||||||
volumes:
|
# volumes:
|
||||||
- contract-config:/tmp/cic/config/:ro
|
# - contract-config:/tmp/cic/config/:ro
|
||||||
command:
|
# command:
|
||||||
- /bin/bash
|
# - /bin/bash
|
||||||
- -c
|
# - -c
|
||||||
- |
|
# - |
|
||||||
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
|
# if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
|
||||||
"/usr/local/bin/uwsgi" \
|
# "/usr/local/bin/uwsgi" \
|
||||||
--wsgi-file /usr/src/cic-eth/cic_eth/runnable/server_agent.py \
|
# --wsgi-file /usr/src/cic-eth/cic_eth/runnable/server_agent.py \
|
||||||
--http :80 \
|
# --http :80 \
|
||||||
--pyargv -vv
|
# --pyargv -vv
|
||||||
# entrypoint:
|
## entrypoint:
|
||||||
# - "/usr/local/bin/uwsgi"
|
## - "/usr/local/bin/uwsgi"
|
||||||
# - "--wsgi-file"
|
## - "--wsgi-file"
|
||||||
# - "/usr/src/cic-eth/cic_eth/runnable/server_agent.py"
|
## - "/usr/src/cic-eth/cic_eth/runnable/server_agent.py"
|
||||||
# - "--http"
|
## - "--http"
|
||||||
# - ":80"
|
## - ":80"
|
||||||
# command: "--pyargv -vv"
|
# # command: "--pyargv -vv"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user