Compare commits

...

3 Commits

Author SHA1 Message Date
lash
0c9b42d086
Prepare integration test 2022-03-12 13:48:40 +00:00
lash
69ad3711cd
Add embedded normalization to filter, cache tx 2022-03-12 12:49:38 +00:00
lash
ed75502f46
Add network token value to core cache tx object 2022-03-12 12:02:45 +00:00
9 changed files with 261 additions and 168 deletions

View File

@ -1,117 +0,0 @@
# standard imports
import datetime
# local imports
from chainqueue.enum import StatusBits
class Adapter:
"""Base class defining interface to be implemented by chainqueue adapters.
The chainqueue adapter collects the following actions:
- add: add a transaction to the queue
- upcoming: get queued transactions ready to be sent to network
- dispatch: send a queued transaction to the network
- translate: decode details of a transaction
- create_session, release_session: session management to control queue state integrity
:param backend: Chainqueue backend
:type backend: TODO - abstract backend class. Must implement get, create_session, release_session
:param pending_retry_threshold: seconds delay before retrying a transaction stalled in the newtork
:type pending_retry_threshold: int
:param error_retry_threshold: seconds delay before retrying a transaction that incurred a recoverable error state
:type error_retry_threshold: int
"""
def __init__(self, backend, pending_retry_threshold=0, error_retry_threshold=0):
self.backend = backend
self.pending_retry_threshold = datetime.timedelta(pending_retry_threshold)
self.error_retry_threshold = datetime.timedelta(error_retry_threshold)
def add(self, bytecode, chain_spec, session=None):
"""Add a transaction to the queue.
:param bytecode: Transaction wire format bytecode, in hex
:type bytecode: str
:param chain_spec: Chain spec to use for transaction decode
:type chain_spec: chainlib.chain.ChainSpec
:param session: Backend state integrity session
:type session: varies
"""
raise NotImplementedError()
def translate(self, bytecode, chain_spec):
"""Decode details of a transaction.
:param bytecode: Transaction wire format bytecode, in hex
:type bytecode: str
:param chain_spec: Chain spec to use for transaction decode
:type chain_spec: chainlib.chain.ChainSpec
"""
raise NotImplementedError()
def get(self, tx_hash, chain_spec, session=None):
"""Retrieve serialized transaction represented by the given transaction hash.
:param chain_spec: Chain spec to use for transaction decode
:type chain_spec: chainlib.chain.ChainSpec
:param tx_hash: Transaction hash, in hex
:type tx_hash: str
:param session: Backend state integrity session
:type session: varies
"""
raise NotImplementedError()
def dispatch(self, chain_spec, rpc, tx_hash, signed_tx, session=None):
"""Send a queued transaction to the network.
:param chain_spec: Chain spec to use to identify the transaction network
:type chain_spec: chainlib.chain.ChainSpec
:param rpc: RPC connection to use for transaction send
:type rpc: chainlib.connection.RPCConnection
:param tx_hash: Transaction hash (checksum of transaction), in hex
:type tx_hash: str
:param signed_tx: Transaction wire format bytecode, in hex
:type signed_tx: str
:param session: Backend state integrity session
:type session: varies
"""
raise NotImplementedError()
def upcoming(self, chain_spec, session=None):
"""Get queued transactions ready to be sent to the network.
The transactions will be a combination of newly submitted transactions, previously sent but stalled transactions, and transactions that could temporarily not be submitted.
:param chain_spec: Chain spec to use to identify the transaction network
:type chain_spec: chainlib.chain.ChainSpec
:param session: Backend state integrity session
:type session: varies
"""
raise NotImplementedError()
def create_session(self, session=None):
"""Create a session context to guarantee atomic state change in backend.
:param session: If specified, session will be used instead of creating a new one
:type session: varies
"""
return self.backend.create_session(session)
def release_session(self, session=None):
"""Release a session context created by create_session.
If session parameter is defined, final session destruction will be deferred to the initial provider of the session. In other words; if create_session was called with a session, release_session should symmetrically be called with the same session.
:param session: Session context.
:type session: varies
"""
return self.backend.release_session(session)

View File

@ -1,15 +1,35 @@
# standard imports
import enum
import logging
logg = logging.getLogger(__name__)
class NoopNormalizer:
def __init__(self):
self.address = self.noop
self.hash = self.noop
self.value = self.noop
def noop(self, v):
return v
noop_normalizer = NoopNormalizer()
class CacheTx:
def __init__(self):
self.v_sender = None
self.v_recipient = None
self.v_nonce = None
self.v_value = None
def __init__(self, normalizer=noop_normalizer):
self.normalizer = normalizer
self.sender = None
self.recipient = None
self.nonce = None
self.value = None
self.tx_hash = None
self.block_number = None
self.tx_index = None
self.timestamp = None
@ -21,11 +41,12 @@ class CacheTx:
self.timestamp = timestamp
def init(self, nonce, sender, recipient, value):
self.v_sender = sender
self.v_recipient = recipient
self.v_nonce = nonce
self.v_value = value
def init(self, tx_hash, nonce, sender, recipient, value):
self.tx_hash = self.normalizer.hash(tx_hash)
self.sender = self.normalizer.address(sender)
self.recipient = self.normalizer.address(recipient)
self.nonce = nonce
self.value = self.normalizer.value(value)
def deserialize(self, signed_tx):
@ -38,14 +59,14 @@ class CacheTx:
def __str__(self):
return '{} -> {} : {}'.format(self.v_sender, self.v_recipient, self.v_value)
return '{}: {} ({}) -> {} = {}'.format(self.tx_hash, self.sender, self.nonce, self.recipient, self.value)
class CacheTokenTx(CacheTx):
def __init__(self): #, nonce, sender, recipient, src_token, dst_token, src_value, dst_value):
super(CacheTokenTx, self).__init__()
def __init__(self, normalizer=noop_normalizer):
super(CacheTokenTx, self).__init__(normalizer=normalizer)
self.v_src_token = None
self.v_src_value = None
self.v_dst_token = None
@ -59,9 +80,10 @@ class CacheSort(enum.Enum):
class CacheFilter:
def __init__(self, senders=None, recipients=None, nonce=None, before=None, after=None, sort=CacheSort.DATE, reverse=False):
self.senders = senders
self.recipients = recipients
def __init__(self, normalizer=noop_normalizer, nonce=None, before=None, after=None, sort=CacheSort.DATE, reverse=False):
self.normalizer = normalizer
self.senders = None
self.recipients = None
self.nonce = nonce
self.before = before
self.after = after
@ -69,6 +91,28 @@ class CacheFilter:
self.reverse = reverse
def add_senders(self, senders):
if self.senders == None:
self.senders = []
if isinstance(senders, str):
senders = [senders]
for sender in senders:
if self.normalizer != None:
sender = self.normalizer.address(sender)
self.senders.append(sender)
def add_recipients(self, recipients):
if self.recipients == None:
self.recipients = []
if isinstance(recipients, str):
recipients = [recipients]
for recipient in recipients:
if self.normalizer != None:
recipient = self.normalizer.address(recipient)
self.recipients.append(recipient)
class Cache:
def put(self, chain_spec, cache_tx):

View File

@ -3,6 +3,10 @@ import logging
import re
import datetime
# local imports
from chainqueue.cache import CacheTx
logg = logging.getLogger(__name__)
@ -18,7 +22,8 @@ def from_key(k):
re_u = r'^[^_][_A-Z]+$'
class Store:
def __init__(self, state_store, index_store, counter, cache=None):
def __init__(self, chain_spec, state_store, index_store, counter, cache=None):
self.chain_spec = chain_spec
self.cache = cache
self.state_store = state_store
self.index_store = index_store
@ -32,14 +37,16 @@ class Store:
setattr(self, v, getattr(self.state_store, v))
def put(self, k, v):
def put(self, k, v, cache_adapter=CacheTx()):
n = self.counter.next()
t = datetime.datetime.now().timestamp()
s = to_key(t, n, k)
self.state_store.put(s, v)
self.index_store.put(k, s)
if self.cache != None:
self.cache.put_serialized(v)
tx = cache_adapter()
tx.deserialize(v)
self.cache.put(self.chain_spec, tx)
def get(self, k):
@ -48,7 +55,7 @@ class Store:
return (s, v,)
def list(self, state=0, limit=4096, strict=False):
def by_state(self, state=0, limit=4096, strict=False):
hashes = []
i = 0
@ -68,3 +75,7 @@ class Store:
pair = from_key(h)
hashes_out.append(pair[1])
return hashes_out
def upcoming(self, limit=4096):
return self.by_state(state=self.QUEUED, limit=limit)

View File

@ -4,6 +4,7 @@ import unittest
# external imports
from shep.store.file import SimpleFileStoreFactory
from chainlib.chain import ChainSpec
# local imports
from chainqueue import (
@ -11,6 +12,10 @@ from chainqueue import (
Status,
)
# test imports
from tests.common import MockCounter
class MockContentStore:
@ -26,18 +31,6 @@ class MockContentStore:
return self.store.get(k)
class MockCounter:
def __init__(self):
self.c = 0
def next(self):
c = self.c
self.c += 1
return c
class TestShepBase(unittest.TestCase):
def setUp(self):
@ -46,4 +39,5 @@ class TestShepBase(unittest.TestCase):
self.state = Status(factory)
content_store = MockContentStore()
counter = MockCounter()
self.store = Store(self.state, content_store, counter)
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.store = Store(chain_spec, self.state, content_store, counter)

40
tests/common.py Normal file
View File

@ -0,0 +1,40 @@
# local imports
from chainqueue.cache import Cache
class MockCounter:
def __init__(self):
self.c = 0
def next(self):
c = self.c
self.c += 1
return c
class MockTokenCache(Cache):
def __init__(self):
self.db = {}
self.last_filter = None
def put(self, chain_spec, cache_tx):
self.db[cache_tx.tx_hash] = cache_tx
def get(self, chain_spec, tx_hash):
return self.db[tx_hash]
def by_nonce(self, cache_filter):
self.last_filter = cache_filter
def by_date(self, cache_filter=None):
self.last_filter = cache_filter
def count(self, cache_filter):
self.last_filter = cache_filter

View File

@ -3,18 +3,22 @@ import os
import logging
import unittest
import hashlib
import math
# external imports
from hexathon import add_0x
from chainlib.chain import ChainSpec
# local imports
from chainqueue import QueueEntry
from chainqueue.cache import (
CacheTokenTx,
CacheFilter,
)
# test imports
from tests.base_shep import TestShepBase
from tests.common import MockTokenCache
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
@ -45,26 +49,85 @@ class MockCacheTokenTx(CacheTokenTx):
z = h.digest()
token = z.hex()
tx = CacheTokenTx()
tx.init(nonce, sender, recipient, value)
tx.set('src_token', token)
tx.set('dst_token', token)
tx.set('src_value', token_value)
tx.set('dst_value', token_value)
h = hashlib.sha256()
h.update(z)
z = h.digest()
tx_hash = z.hex()
return tx
#tx = CacheTokenTx(normalizer=self.normalizer)
self.init(tx_hash, nonce, sender, recipient, value)
self.set('src_token', token)
self.set('dst_token', token)
self.set('src_value', token_value)
self.set('dst_value', token_value)
self.confirm(42, 13, 1024000)
return self
class MockNormalizer:
def address(self, v):
return 'address' + v
def value(self, v):
dv = int(math.log10(v) + 1)
return float(v / (10 ** dv))
def hash(self, v):
return 'ashbashhash' + v
class TestCache(TestShepBase):
def setUp(self):
super(TestCache, self).setUp()
self.tx = MockCacheTokenTx()
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.cache = MockTokenCache()
def test_basic_translator(self):
def test_cache_instance(self):
normalizer = MockNormalizer()
a = b'foo'
tx = self.tx.deserialize(a)
print(tx)
tx = MockCacheTokenTx(normalizer=normalizer)
tx.deserialize(a)
self.assertTrue(isinstance(tx.value, float))
self.assertEqual(tx.sender[:4], 'addr')
self.assertEqual(tx.recipient[:4], 'addr')
self.assertEqual(tx.tx_hash[:11], 'ashbashhash')
def test_cache_putget(self):
a = b'foo'
tx = MockCacheTokenTx()
tx.deserialize(a)
self.cache.put(self.chain_spec, tx)
tx_retrieved = self.cache.get(self.chain_spec, tx.tx_hash)
self.assertEqual(tx, tx_retrieved)
def test_cache_filter(self):
normalizer = MockNormalizer()
fltr = CacheFilter(normalizer=normalizer)
sender = os.urandom(20).hex()
fltr.add_senders(sender)
recipient_one = os.urandom(20).hex()
recipient_two = os.urandom(20).hex()
fltr.add_recipients([recipient_one, recipient_two])
self.assertEqual(fltr.senders[0][:4], 'addr')
self.assertEqual(fltr.recipients[1][:4], 'addr')
def test_cache_query(self):
a = os.urandom(20).hex()
fltr = CacheFilter(nonce=42)
self.cache.count(fltr)
self.assertEqual(self.cache.last_filter, fltr)
if __name__ == '__main__':

View File

@ -31,31 +31,31 @@ class TestEntry(TestShepBase):
entry = QueueEntry(self.store, tx_hash_two)
entry.create(signed_tx)
txs = self.store.list()
txs = self.store.by_state()
self.assertEqual(len(txs), 2)
entry = QueueEntry(self.store, tx_hash_one)
entry.load()
entry.sent()
txs = self.store.list()
txs = self.store.by_state()
self.assertEqual(len(txs), 1)
txs = self.store.list(state=self.store.IN_NETWORK)
txs = self.store.by_state(state=self.store.IN_NETWORK)
self.assertEqual(len(txs), 1)
entry.succeed(0)
txs = self.store.list()
txs = self.store.by_state()
self.assertEqual(len(txs), 1)
entry = QueueEntry(self.store, tx_hash_two)
entry.load()
entry.sent()
txs = self.store.list(state=self.store.IN_NETWORK)
txs = self.store.by_state(state=self.store.IN_NETWORK)
self.assertEqual(len(txs), 2)
txs = self.store.list(state=self.store.IN_NETWORK, strict=True)
txs = self.store.by_state(state=self.store.IN_NETWORK, strict=True)
self.assertEqual(len(txs), 1)

54
tests/test_integrate.py Normal file
View File

@ -0,0 +1,54 @@
# standard imports
import tempfile
import unittest
# external imports
from shep.store.file import SimpleFileStoreFactory
from chainlib.chain import ChainSpec
# local imports
from chainqueue import (
Store,
Status,
)
# test imports
from tests.common import (
MockCounter,
MockTokenCache
)
class MockContentStore:
def __init__(self):
self.store = {}
def put(self, k, v):
self.store[k] = v
def get(self, k):
return self.store.get(k)
class TestShepBase(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
factory = SimpleFileStoreFactory(self.path).add
self.state = Status(factory)
content_store = MockContentStore()
counter = MockCounter()
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.cache = MockTokenCache()
self.store = Store(chain_spec, self.state, content_store, counter, cache=self.cache)
def test_basic(self):
pass
if __name__ == '__main__':
unittest.main()

View File

@ -51,5 +51,9 @@ class TestShep(TestShepBase):
self.state.move('foo', self.state.INSUFFICIENT_FUNDS)
def test_shep_cache(self):
self.store.put('foo', 'bar')
if __name__ == '__main__':
unittest.main()