Compare commits

...

64 Commits

Author SHA1 Message Date
lash
5cd52cbb97 Change license to APL3, waive copyright 2022-11-14 07:29:18 +00:00
lash
1b24fd5439 Implement shep 0.3.x, chainlib 0.4.x 2022-11-06 15:17:55 +00:00
lash
b7a84acdb4 Bump minor version 2022-05-14 16:29:02 +00:00
lash
b7953cbd0f Add data module 2022-05-14 12:38:10 +00:00
lash
c5bd4aad3a Update cli module for chainlib 0.3.0, fix remaining settings processing 2022-05-13 13:46:30 +00:00
lash
c45e6e3310 Implement settings on chainlib 0.3.0 structure 2022-05-13 09:29:54 +00:00
lash
97fbec6477 Rehabilitate tests after by state signature change 2022-05-09 19:40:41 +00:00
lash
2a9bf41cf5 Deduplicate multiple matches in by-state list 2022-05-06 14:00:05 +00:00
lash
c54661d39e Add prerelease version 2022-05-06 08:47:51 +00:00
lash
ddeae91611 Add state parser cli tool 2022-05-06 08:47:29 +00:00
lash
d94cf08719 Rehabilitate list tool, enhance settings and config 2022-05-06 08:30:14 +00:00
lash
a0f8960643 Update changelog 2022-05-05 17:06:30 +00:00
lash
ce0f29d982 Upgrade shep, omit sync on persist set 2022-05-05 17:03:19 +00:00
lash
263d4df300 Update changelog 2022-05-05 15:42:08 +00:00
lash
029deead75 Upgrade shep to handle exception in filestore list 2022-05-05 15:37:19 +00:00
lash
da9fb5925d Upgrade shep 2022-05-05 15:11:01 +00:00
lash
cbf00281c6 Remove sync for each get 2022-05-05 14:22:21 +00:00
lash
01ad409077 Raise correct error in index store exists check 2022-05-04 18:37:02 +00:00
lash
3a8ec01588 Allow for sync skip in queue store instantiation 2022-05-04 05:44:47 +00:00
lash
b63793fd9b Add purge to chainqueue store object 2022-05-02 20:21:51 +00:00
lash
84b8eb10e6 Remove spam logline 2022-05-01 07:40:32 +00:00
lash
532ff230b4 Remove race waits (defer to client layer) 2022-05-01 06:44:33 +00:00
lash
f7c09acfe2 Add race delay 2022-05-01 06:27:52 +00:00
lash
04d9901f0d Allow backend objects to move between sync and get 2022-04-30 18:31:02 +00:00
lash
b8c2b1b86a Sort statewise results 2022-04-30 16:43:55 +00:00
lash
c94b291d39 Add upcoming tests, event callback pass to shep 2022-04-30 05:42:44 +00:00
lash
6c360ca2e5 Add reserved check method 2022-04-29 06:28:01 +00:00
lash
ff74679de8 Remove unneeded deps 2022-04-28 15:37:06 +00:00
lash
94bd5c8cdf Add cli handling and settings 2022-04-28 12:37:08 +00:00
lash
ccbbcc2157 Sync chainqueue state store on get 2022-04-27 06:23:58 +00:00
lash
57191ea378 Move outputter module to explicit module path in cli 2022-04-26 21:35:20 +00:00
lash
e646edecca Upgrade shep 2022-04-26 09:21:30 +00:00
lash
95930ef7de Handle duplicate tx attempts 2022-04-12 13:44:29 +00:00
lash
c22fafad53 Update setup 2022-04-10 15:31:58 +00:00
lash
b5f513b63a Ignore missing txs, sync store on start 2022-04-10 15:30:08 +00:00
lash
01b674d09e Add change test for chainqueue entry 2022-04-10 14:00:01 +00:00
lash
0fa12adfa1 Add tx src member 2022-03-15 09:32:41 +00:00
lash
c094ca2198 Implement chainspec in entry, cache tx 2022-03-15 09:00:15 +00:00
lash
e4cc7061f0 Force hashing of tx inside puts 2022-03-15 08:06:39 +00:00
lash
92cb5d1978 Add state finalizers 2022-03-14 21:17:00 +00:00
lash
f8b256b51b Add reserve, send enclosure 2022-03-14 19:53:54 +00:00
lash
1f7ca28647 Remove log spam 2022-03-13 17:24:25 +00:00
lash
d19fbf005e Add date modified to state dirs 2022-03-13 17:22:39 +00:00
lash
485b33866b Harden query tests 2022-03-13 16:10:40 +00:00
lash
04dfb185ce Implement upcoming query on store 2022-03-13 15:45:48 +00:00
lash
a6e48d93a8 Implement upcoming query on store 2022-03-13 15:40:45 +00:00
lash
51c8124a28 Add store test, move store to subdir module 2022-03-13 14:58:26 +00:00
lash
bdebeb6010 Add missing provision in cache test 2022-03-12 14:22:55 +00:00
lash
d5f19248da handle strings in tx inputs in test token cache tx object 2022-03-12 14:19:56 +00:00
lash
e457275128 WIP crossroads on hex vs bytes interpretation 2022-03-12 14:12:02 +00:00
lash
0c9b42d086 Prepare integration test 2022-03-12 13:48:40 +00:00
lash
69ad3711cd Add embedded normalization to filter, cache tx 2022-03-12 12:49:38 +00:00
lash
ed75502f46 Add network token value to core cache tx object 2022-03-12 12:02:45 +00:00
lash
68f50246d2 Add cache interface methods, move old tests 2022-03-12 09:14:23 +00:00
lash
bd77706d1a Add cache handling 2022-03-12 08:48:19 +00:00
lash
b763d11eff Simplify queueentry 2022-03-11 21:49:23 +00:00
lash
790c9ddf13 Normalize hex in queueentry 2022-03-11 20:16:42 +00:00
lash
3880249683 Implement strict get match 2022-03-11 19:43:00 +00:00
lash
c4caab6a3a implement get tx by state 2022-03-11 19:38:12 +00:00
lash
539d3384a6 Add addrses to nonce index on new tx object 2022-03-11 14:10:16 +00:00
lash
38aae6f8c0 WIP Implement transition setters on shep 2022-03-11 12:15:44 +00:00
lash
0682dd8ed3 Rename gas state name 2022-03-11 11:05:56 +00:00
lash
fce9bce6fc Initial shep provisions 2022-03-11 11:03:05 +00:00
lash
5a92058e74 Bump version 2022-03-11 07:35:18 +00:00
42 changed files with 1706 additions and 206 deletions

View File

@@ -1,3 +1,50 @@
- 0.2.2
* Change license to AGPL3 and copyright waived to public domain
- 0.2.1
* Implement shep 0.3.0
* Implement chainlib 0.4.x
- 0.2.0
* Implement chainlib 0.3.0
- 0.1.16
* Queue list cli tool
* State parser cli tool
* Provide pluggable renderer capability for queue list cli tool
* Move path and state query parsing to settings module
* Add queue path and digest parameters to base config
- 0.1.15
* Upgrade shep to avoid sync in persist set
- 0.1.14
* Upgrade shep to handle exception in filestore list
- 0.1.13
* Remove sync on each get
* Upgrade shep to guarantee atomic state lock state
- 0.1.12
* Raise correct exception from index store exists check
- 0.1.11
* Allow for sync skip in store instantiation
- 0.1.10
* Improve logging
- 0.1.9
* Upgrade deps
- 0.1.8
* Upgrade deps
- 0.1.7
* Improve logging
- 0.1.6
* Sort upcoming queue item chronologically
* Add unit testing for upcoming query method
- 0.1.5
* Add reserved state check method
- 0.1.4
* Dependency cleanups
- 0.1.3
* Add CLI args and config handling, settings object
- 0.1.2
* Add CLI inspection tools
- 0.1.1
*
- 0.1.0
* Replace state transitions with shep
- 0.0.3
* cli tool for listing queue by address
* ensure lowercase hex input in db

View File

@@ -1 +1 @@
include *requirements.txt LICENSE chainqueue/db/migrations/default/* chainqueue/db/migrations/default/versions/* chainqueue/db/migrations/default/versions/src/* chainqueue/data/config/*
include *requirements.txt LICENSE WAIVER WAIVER.asc CHANGELOG chainqueue/db/migrations/default/* chainqueue/db/migrations/default/versions/* chainqueue/db/migrations/default/versions/src/* chainqueue/data/config/*

17
WAIVER Normal file
View File

@@ -0,0 +1,17 @@
# Copyright waiver for the python package "chainqueue"
I dedicate any and all copyright interest in this software to the
public domain. I make this dedication for the benefit of the public at
large and to the detriment of my heirs and successors. I intend this
dedication to be an overt act of relinquishment in perpetuity of all
present and future rights to this software under copyright law.
To the best of my knowledge and belief, my contributions are either
originally authored by me or are derived from prior works which I have
verified are also in the public domain and are not subject to claims
of copyright by other parties.
To the best of my knowledge and belief, no individual, business,
organization, government, or other entity has any copyright interest
in my contributions, and I affirm that I will not make contributions
that are otherwise encumbered.

29
WAIVER.asc Normal file
View File

@@ -0,0 +1,29 @@
-----BEGIN PGP MESSAGE-----
owGVU3tQFHUcvztPxAW1KZNp4vETfJUXaCgRoiQIeGkEiIBOJrt7v7v7cXe7yz64
LhDGfPAoKmi0A8dHmRQlA4xnIeTENT2YkhzIkEc1XlHkCIeTSo+5iX67B2r1V3/c
zO3+vt/P87evzpulClQv7hvtrPz47TR18yyeCsjbqM9NzaaLxj+KAiks5+CRySwC
O4mKIQ+MLA9EMwScQzSzDOBI2kKaIIikzSRiiiQowUiC0AMDNCCaFCEgGQf+GQBp
tQL6NhhiRMhDQf6D0ZAABNYo2kkeApGV4QlOoqyIBgbWhmGjgR7YSAv0j05DI8w+
I4aCDDQiEbBGvzb/MikSVpI3QYXfj4uXRR7ZIKPM2hzADBEvKAOCRNNQEFhekOlk
gfIWJiTuIsQolIwHWJyFCEhaweGhVfaOBLOCjD1xkOegKCHRIZ9j7wSH7cqHMpVR
EiVsVYlC8Cu7OwKJMeCg74RlJe3RBJHDTlsVZrRbGNZuhYZpgxQWAY06+YBmGeyS
kmTJ2ByGhAjv8gSLARGD5eBOJNwfD/GeA9ggwEHKc5gYt4wV8qwNcDzCr+0sbxGA
3YxoM87FTBZDAntHRoTH5BXSKrD+Gm8H72/NXzueYFgRp0sVQpwWNktbSWQTCOzh
jkUsgpUV4vvEiwgK/8MvI7MbUDEySKRVByhJQAzuUYfNmkgGPa8UpwMmuTFG7kcn
m/Wz4Se5IjMpKPf0v/eTwDb+HahOodcD0mhEvA2LJEX8ZEf4gstOlYv6jwVCGZGT
UFjtSMCFMLRkozCHIZqo0sRqVepAVdiixdrAZeNvcI+EV3z2Q0XWzFc5WyN/iypi
7j0zb5JjFkxtzw3Ze2SFIdU192jctk7fS5u7J7qbvloaVRHS8XXt2Cde32+Tq4eu
tpz0ZI6csJwrG6gY/X54cLT0/l+butJ8aOLyqZjBd9Z+9+k6b15NeOm16JWxwZ09
Ne9rD6lfpi62ORtLhOX1I5q2xvFvvnhlfuLCa+VrHo59ZkDzRKj7mHt9YnN2lfYM
ashqWpLyQHlvxvzE9qSmi5k7+9eeKtBcab1x4nq8yRtZPRaz0FVTsOWpnHlPX1ab
g1Z9uyW2uC3GVUVpbzyaUP5LfVJyxINHJnoGrfp2XWj2a40FtYdu9oTteiG10t36
Yq73w9VLb1aMhs1putosUYf7un5ekORtpPdtvbD1fF9HUG2n7UpR+gr1+fVs1s7D
hVROiNsUc4kuOdDb8lCE1Du4YZORI/S7XXXtUZungj0NjZXHb3mmPIk/Hs3fVVA2
4HTOOTu748uGeovxOjdS9+Zf5fUB3SX3habmfw4SawMdnu4z1X0qX+dbf9hsaVmu
jncDjxlTTqdb9jjiziE3uTx+E1Aty3iv9INWV513X7hTuLUoklRFJE/sWDnslvqT
ug6M7n/8bMNkW0vc7xt0zu2PbQvYfVz75HA6CNpzYSiljA2wcoMZYxt/sqw6+OcO
Pj5p6PXM59Ytydib0H8pqNdXWVm9ZlKjPjl177O+BE9ATiGfuz+4KD3f2SflVZWn
nuaEvwE=
=lGEV
-----END PGP MESSAGE-----

3
chainqueue/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .state import Status
from .entry import QueueEntry
from .store import Store

View File

@@ -1,117 +0,0 @@
# standard imports
import datetime
# local imports
from chainqueue.enum import StatusBits
class Adapter:
"""Base class defining interface to be implemented by chainqueue adapters.
The chainqueue adapter collects the following actions:
- add: add a transaction to the queue
- upcoming: get queued transactions ready to be sent to network
- dispatch: send a queued transaction to the network
- translate: decode details of a transaction
- create_session, release_session: session management to control queue state integrity
:param backend: Chainqueue backend
:type backend: TODO - abstract backend class. Must implement get, create_session, release_session
:param pending_retry_threshold: seconds delay before retrying a transaction stalled in the newtork
:type pending_retry_threshold: int
:param error_retry_threshold: seconds delay before retrying a transaction that incurred a recoverable error state
:type error_retry_threshold: int
"""
def __init__(self, backend, pending_retry_threshold=0, error_retry_threshold=0):
self.backend = backend
self.pending_retry_threshold = datetime.timedelta(pending_retry_threshold)
self.error_retry_threshold = datetime.timedelta(error_retry_threshold)
def add(self, bytecode, chain_spec, session=None):
"""Add a transaction to the queue.
:param bytecode: Transaction wire format bytecode, in hex
:type bytecode: str
:param chain_spec: Chain spec to use for transaction decode
:type chain_spec: chainlib.chain.ChainSpec
:param session: Backend state integrity session
:type session: varies
"""
raise NotImplementedError()
def translate(self, bytecode, chain_spec):
"""Decode details of a transaction.
:param bytecode: Transaction wire format bytecode, in hex
:type bytecode: str
:param chain_spec: Chain spec to use for transaction decode
:type chain_spec: chainlib.chain.ChainSpec
"""
raise NotImplementedError()
def get(self, tx_hash, chain_spec, session=None):
"""Retrieve serialized transaction represented by the given transaction hash.
:param chain_spec: Chain spec to use for transaction decode
:type chain_spec: chainlib.chain.ChainSpec
:param tx_hash: Transaction hash, in hex
:type tx_hash: str
:param session: Backend state integrity session
:type session: varies
"""
raise NotImplementedError()
def dispatch(self, chain_spec, rpc, tx_hash, signed_tx, session=None):
"""Send a queued transaction to the network.
:param chain_spec: Chain spec to use to identify the transaction network
:type chain_spec: chainlib.chain.ChainSpec
:param rpc: RPC connection to use for transaction send
:type rpc: chainlib.connection.RPCConnection
:param tx_hash: Transaction hash (checksum of transaction), in hex
:type tx_hash: str
:param signed_tx: Transaction wire format bytecode, in hex
:type signed_tx: str
:param session: Backend state integrity session
:type session: varies
"""
raise NotImplementedError()
def upcoming(self, chain_spec, session=None):
"""Get queued transactions ready to be sent to the network.
The transactions will be a combination of newly submitted transactions, previously sent but stalled transactions, and transactions that could temporarily not be submitted.
:param chain_spec: Chain spec to use to identify the transaction network
:type chain_spec: chainlib.chain.ChainSpec
:param session: Backend state integrity session
:type session: varies
"""
raise NotImplementedError()
def create_session(self, session=None):
"""Create a session context to guarantee atomic state change in backend.
:param session: If specified, session will be used instead of creating a new one
:type session: varies
"""
return self.backend.create_session(session)
def release_session(self, session=None):
"""Release a session context created by create_session.
If session parameter is defined, final session destruction will be deferred to the initial provider of the session. In other words; if create_session was called with a session, release_session should symmetrically be called with the same session.
:param session: Session context.
:type session: varies
"""
return self.backend.release_session(session)

1
chainqueue/cache/__init__.py vendored Normal file
View File

@@ -0,0 +1 @@
from .base import *

141
chainqueue/cache/base.py vendored Normal file
View File

@@ -0,0 +1,141 @@
# standard imports
import enum
import logging
logg = logging.getLogger(__name__)
class NoopNormalizer:
def __init__(self):
self.address = self.noop
self.hash = self.noop
self.value = self.noop
def noop(self, v):
return v
noop_normalizer = NoopNormalizer()
class CacheTx:
def __init__(self, chain_spec, normalizer=noop_normalizer):
self.normalizer = normalizer
self.sender = None
self.recipient = None
self.nonce = None
self.value = None
self.hash = None
self.block_number = None
self.tx_index = None
self.timestamp = None
self.src = None
self.chain_spec = chain_spec
def confirm(self, block_number, tx_index, timestamp):
self.block_number = block_number
self.tx_index = tx_index
self.timestamp = timestamp
def init(self, tx_hash, nonce, sender, recipient, value):
self.hash = self.normalizer.hash(tx_hash)
self.sender = self.normalizer.address(sender)
self.recipient = self.normalizer.address(recipient)
self.nonce = nonce
self.value = self.normalizer.value(value)
def deserialize(self, signed_tx):
raise NotImplementedError()
def set(self, k, v):
k = 'v_' + k
setattr(self, k, v)
def __str__(self):
return '{}: {} ({}) -> {} = {}'.format(self.hash, self.sender, self.nonce, self.recipient, self.value)
class CacheTokenTx(CacheTx):
def __init__(self, chain_spec, normalizer=noop_normalizer):
super(CacheTokenTx, self).__init__(chain_spec, normalizer=normalizer)
self.v_src_token = None
self.v_src_value = None
self.v_dst_token = None
self.v_dst_value = None
class CacheSort(enum.Enum):
DATE = 1
NONCE = 2
class CacheFilter:
def __init__(self, normalizer=noop_normalizer, nonce=None, before=None, after=None, sort=CacheSort.DATE, reverse=False):
self.normalizer = normalizer
self.senders = None
self.recipients = None
self.nonce = nonce
self.before = before
self.after = after
self.sort = sort
self.reverse = reverse
def add_senders(self, senders):
if self.senders == None:
self.senders = []
if isinstance(senders, str):
senders = [senders]
for sender in senders:
if self.normalizer != None:
sender = self.normalizer.address(sender)
self.senders.append(sender)
def add_recipients(self, recipients):
if self.recipients == None:
self.recipients = []
if isinstance(recipients, str):
recipients = [recipients]
for recipient in recipients:
if self.normalizer != None:
recipient = self.normalizer.address(recipient)
self.recipients.append(recipient)
class Cache:
def put(self, chain_spec, cache_tx):
raise NotImplementedError()
def get(self, chain_spec, tx_hash):
raise NotImplementedError()
def by_nonce(self, cache_filter):
raise NotImplementedError()
def by_date(self, cache_filter=None):
raise NotImplementedError()
def count(self, cache_filter=None):
raise NotImplementedError()
def set_block(self, block, tx):
raise NotImplementedError()

10
chainqueue/cache/fs.py vendored Normal file
View File

@@ -0,0 +1,10 @@
# local imports
from .base import Cache
class FsCache(Cache):
def __init__(self, path):
self.path = path

8
chainqueue/cli/arg.py Normal file
View File

@@ -0,0 +1,8 @@
def apply_flag(flag):
flag.add('queue')
return flag
def apply_arg(arg):
arg.add_long('tx-digest-size', 'queue', type=int, help='Size of transaction hash in bytes')
return arg

10
chainqueue/cli/config.py Normal file
View File

@@ -0,0 +1,10 @@
def process_config(config, args, flags):
args_override = {}
args_override['QUEUE_BACKEND'] = getattr(args, 'backend')
args_override['TX_DIGEST_SIZE'] = getattr(args, 'tx_digest_size')
args_override['QUEUE_STATE_PATH'] = getattr(args, 'state_dir')
config.dict_override(args_override, 'local cli args')
return config

View File

@@ -1,9 +1,8 @@
[database]
name =
engine =
driver =
host =
port =
user =
password =
debug = 0
[queue]
backend = mem
state_path =
index_path =
counter_path =
[tx]
digest_size = 32

151
chainqueue/entry.py Normal file
View File

@@ -0,0 +1,151 @@
# standard imports
import logging
# external imports
from hexathon import (
add_0x,
strip_0x,
uniform,
)
# local imports
from chainqueue.cache import CacheTx
logg = logging.getLogger(__name__)
def normalize_hex(k):
k = strip_0x(k)
return uniform(k)
class QueueEntry:
def __init__(self, store, tx_hash=None, cache_adapter=CacheTx):
self.store = store
#self.tx_hash = normalize_hex(tx_hash)
self.tx_hash = tx_hash
self.signed_tx = None
self.seq = None
self.k = None
self.synced = False
self.cache_adapter = cache_adapter
def serialize(self):
return self.signed_tx
def create(self, signed_tx):
signed_tx = normalize_hex(signed_tx)
(s, tx_hash) = self.store.put(signed_tx, cache_adapter=self.cache_adapter)
self.k = s
self.synced = True
return tx_hash
def local_state(self):
state = self.store.state(self.k)
state_str = self.store.name(state)
return (state, state_str,)
def load(self):
(self.k, self.signed_tx) = self.store.get(self.tx_hash)
self.synced = True
def __match_state(self, state):
return bool(self.store.state(self.k) & state)
def waitforfunds(self):
if self.__match_state(self.store.INSUFFICIENT_FUNDS):
return
self.store.move(self.k, self.store.INSUFFICIENT_FUNDS)
def fubar(self):
if self.__match_state(self.store.UNKNOWN_ERROR):
return
self.store.set(self.k, self.store.UNKNOWN_ERROR)
def reject(self):
if self.__match_state(self.store.NODE_ERROR):
return
self.store.set(self.k, self.store.NODE_ERROR)
def override(self, manual=False):
if manual:
self.store.set(self.k, self.store.OBSOLETE | self.store.MANUAL)
else:
self.store.set(self.k, self.store.OBSOLETE)
def manual(self):
self.store.set(self.k, self.store.MANUAL)
def retry(self):
if self.__match_state(self.store.QUEUED):
return
self.store.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS)
def readysend(self):
if self.__match_state(self.store.QUEUED):
return
self.store.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS)
def sent(self):
if self.__match_state(self.store.IN_NETWORK):
return
self.store.change(self.k, self.store.IN_NETWORK, self.store.RESERVED | self.store.DEFERRED | self.store.QUEUED | self.store.LOCAL_ERROR | self.store.NODE_ERROR)
def sendfail(self):
if self.__match_state(self.store.NODE_ERROR):
return
self.store.change(self.k, self.store.LOCAL_ERROR | self.store.DEFERRED, self.store.RESERVED | self.store.QUEUED | self.store.INSUFFICIENT_FUNDS)
def reserve(self):
if self.__match_state(self.store.RESERVED):
return
self.store.change(self.k, self.store.RESERVED, self.store.QUEUED)
def fail(self, block, tx):
if self.__match_state(self.store.NETWORK_ERROR):
return
v = self.store.state(self.k)
self.store.change(self.k, v | self.store.NETWORK_ERROR, self.store.QUEUED)
if self.store.cache:
self.store.cache.set_block(self.tx_hash, block, tx)
def cancel(self, confirmed=False):
if confirmed:
self.store.change(self.k, self.store.OBSOLETE | self.store.FINAL, self.store.RESERVED | self.store.QUEUED)
else:
self.store.change(self.k, self.store.OBSOLETE, self.store.RESERVED | self.store.QUEUED)
def succeed(self, block, tx):
self.store.set(self.k, self.store.FINAL)
if self.store.cache:
self.store.cache.set_block(self.tx_hash, block, tx)
def test(self, state):
return self.__match_state(state)
def __str__(self):
v = self.store.get(self.tx_hash)
n = self.store.state(v[0])
s = self.store.name(n)
return '{}: {} ({})'.format(self.k, s, n)

View File

@@ -42,7 +42,6 @@ class StatusEnum(enum.IntEnum):
"""
PENDING = 0
"""Transaction has been added but no processing has been performed"""
SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR
"""Temporary error occurred when sending transaction to node"""
RETRY = StatusBits.QUEUED | StatusBits.DEFERRED

View File

@@ -24,8 +24,7 @@ class CacheIntegrityError(ChainQueueException):
pass
class BackendIntegrityError(ChainQueueException):
"""Raised when queue backend has invalid state
class DuplicateTxError(ChainQueueException):
"""Backend already knows transaction
"""
pass

View File

@@ -4,15 +4,19 @@
import os
import logging
import sys
import importlib
# external imports
from hexathon import add_0x
import chainlib.cli
from chainlib.chain import ChainSpec
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
# local imports
from chainqueue.cli import Outputter
import chainqueue.cli
#from chainqueue.cli.output import Outputter
from chainqueue.settings import ChainqueueSettings
from chainqueue.store import Store
from chainqueue.entry import QueueEntry
logging.basicConfig(level=logging.WARNING)
@@ -21,30 +25,31 @@ logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'data', 'config')
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC | chainlib.cli.Flag.UNSAFE
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")')
argparser.add_argument('--start', type=str, help='Oldest transaction hash to include in results')
argparser.add_argument('--end', type=str, help='Newest transaction hash to include in results')
argparser.add_argument('--backend', type=str, default='sql', help='Backend to use')
argparser.add_argument('--error', action='store_true', help='Only show transactions which have error state')
argparser.add_argument('--pending', action='store_true', help='Omit finalized transactions')
argparser.add_argument('--status-mask', type=int, dest='status_mask', help='Manually specify status bitmask value to match (overrides --error and --pending)')
argparser.add_argument('--summary', action='store_true', help='output summary for each status category')
argparser.add_argument('-o', '--column', dest='column', action='append', type=str, help='add a column to display')
argparser.add_positional('address', type=str, help='Ethereum address of recipient')
argparser.add_argument('--no-final', action='store_true', dest='no_final', help='Omit finalized transactions')
argparser.add_argument('--status-mask', type=str, dest='status_mask', action='append', default=[], help='Manually specify status bitmask value to match (overrides --error and --pending)')
argparser.add_argument('--exact', action='store_true', help='Match status exact')
argparser.add_argument('--include-pending', action='store_true', dest='include_pending', help='Include transactions in unprocessed state (pending)')
argparser.add_argument('--renderer', type=str, default=[], action='append', help='Transaction renderer for output')
argparser.add_positional('address', required=False, type=str, help='Ethereum address of recipient')
args = argparser.parse_args()
extra_args = {
'address': None,
'backend': None,
'start': None,
'end': None,
'state_dir': None,
'exact': None,
'error': None,
'pending': None,
'include_pending': '_PENDING',
'status_mask': None,
'column': None,
'summary': None,
'no_final': None,
'renderer': None,
}
config = chainlib.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
config = chainqueue.cli.config.process_config(config, args, 0)
logg.debug('config loaded:\n{}'.format(config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
@@ -58,36 +63,40 @@ if status_mask == None:
tx_getter = None
tx_lister = None
session_method = None
if config.get('_BACKEND') == 'sql':
from chainqueue.sql.query import get_account_tx as tx_lister
from chainqueue.sql.query import get_tx_cache as tx_getter
from chainqueue.runnable.sql import setup_backend
from chainqueue.db.models.base import SessionBase
setup_backend(config, debug=config.true('DATABASE_DEBUG'))
session_method = SessionBase.create_session
else:
raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND')))
#output_cols = config.get('_COLUMN')
output_cols = config.get('_COLUMN')
renderers_mods = []
for renderer in config.get('_RENDERER'):
m = importlib.import_module(renderer)
renderers_mods.append(m)
logg.info('using renderer module {}'.format(renderer))
settings = ChainqueueSettings()
settings.process(config)
logg.debug('settings:\n{}'.format(settings))
def main():
since = config.get('_START', None)
if since != None:
since = add_0x(since)
until = config.get('_END', None)
if until != None:
until = add_0x(until)
txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask)
outputter = Outputter(chain_spec, sys.stdout, tx_getter, session_method=session_method, decode_status=config.true('_RAW'), cols=output_cols)
if config.get('_SUMMARY'):
for k in txs.keys():
outputter.add(k)
outputter.decode_summary()
else:
for k in txs.keys():
outputter.decode_single(k)
# since = config.get('_START', None)
# if since != None:
# since = add_0x(since)
# until = config.get('_END', None)
# if until != None:
# until = add_0x(until)
# txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask)
txs = settings.get('QUEUE_STORE').by_state(state=settings.get('QUEUE_STATUS_FILTER'), strict=config.get('_EXACT'), include_pending=config.get('_PENDING'))
for i, tx_hash in enumerate(txs):
entry = QueueEntry(settings.get('QUEUE_STORE'), tx_hash)
entry.load()
v = None
if len(renderers_mods) == 0:
v = str(entry)
else:
for m in renderers_mods:
v = m.apply(i, settings, v, settings.get('CHAIN_SPEC'), entry)
print(v)
if __name__ == '__main__':
main()

View File

@@ -1,14 +0,0 @@
# standard imports
import logging
# local imports
from chainqueue.db.models.base import SessionBase
from chainqueue.db import dsn_from_config
logg = logging.getLogger(__name__)
def setup_backend(config, debug=False):
dsn = dsn_from_config(config)
logg.debug('dsn {}'.format(dsn))
SessionBase.connect(dsn, debug=debug)

View File

@@ -0,0 +1,51 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import argparse
# local imports
from chainqueue.state import Status
argparser = argparse.ArgumentParser()
argparser.add_argument('-r', '--raw', dest='r', action='store_true', help='Always print pure state element strings')
argparser.add_argument('state', type=str, help='State to interpret')
args = argparser.parse_args()
status_interpreter = Status(None, allow_invalid=True)
def handle_numeric(v, elements=False):
if elements:
if not status_interpreter.is_pure(v):
return status_interpreter.elements(v)
return status_interpreter.name(v)
def handle_string(v):
try:
return status_interpreter.from_name(v)
except AttributeError:
return status_interpreter.from_elements(v)
def main():
v = None
numeric = False
try:
v = int(args.state)
numeric = True
except:
v = args.state
r = None
if numeric:
r = handle_numeric(v, elements=args.r)
else:
r = handle_string(v)
print(r)
if __name__ == '__main__':
main()

99
chainqueue/settings.py Normal file
View File

@@ -0,0 +1,99 @@
# standard imports
import os
import logging
# external imports
from chainlib.settings import ChainSettings
from chainqueue.state import Status
from chainqueue.store import Store
logg = logging.getLogger(__name__)
def process_queue_tx(settings, config):
settings.set('TX_DIGEST_SIZE', config.get('TX_DIGEST_SIZE'))
return settings
def process_queue_store(settings, config):
status = Status(settings.get('QUEUE_STORE_FACTORY'), allow_invalid=True)
settings.set('QUEUE_STATE_STORE', status)
store = Store(
settings.get('CHAIN_SPEC'),
settings.get('QUEUE_STATE_STORE'),
settings.get('QUEUE_INDEX_STORE'),
settings.get('QUEUE_COUNTER_STORE'),
sync=True,
)
settings.set('QUEUE_STORE', store)
return settings
def process_queue_paths(settings, config):
index_dir = config.get('QUEUE_INDEX_PATH')
if index_dir == None:
index_dir = os.path.join(config.get('STATE_PATH'), 'tx')
counter_dir = config.get('QUEUE_COUNTER_PATH')
if counter_dir == None:
counter_dir = os.path.join(config.get('STATE_PATH'))
settings.set('QUEUE_STATE_PATH', config.get('STATE_PATH'))
settings.set('QUEUE_INDEX_PATH', index_dir)
settings.set('QUEUE_COUNTER_PATH', counter_dir)
return settings
def process_queue_backend_fs(settings, config):
from chainqueue.store.fs import IndexStore
from chainqueue.store.fs import CounterStore
from shep.store.file import SimpleFileStoreFactory
index_store = IndexStore(settings.o['QUEUE_INDEX_PATH'], digest_bytes=int(settings.o['TX_DIGEST_SIZE']))
counter_store = CounterStore(settings.o['QUEUE_COUNTER_PATH'])
factory = SimpleFileStoreFactory(settings.o['QUEUE_STATE_PATH'], use_lock=True).add
settings.set('QUEUE_INDEX_STORE', index_store)
settings.set('QUEUE_COUNTER_STORE', counter_store)
settings.set('QUEUE_STORE_FACTORY', factory)
return settings
def process_queue_status_filter(settings, config):
states = 0
store = settings.get('QUEUE_STATE_STORE')
if len(config.get('_STATUS_MASK')) == 0:
for v in store.all(numeric=True):
states |= v
logg.debug('state store {}'.format(states))
else:
for v in config.get('_STATUS_MASK'):
try:
states |= int(v)
continue
except ValueError:
pass
state = store.from_name(v)
logg.debug('resolved state argument {} to numeric state {}'.format(v, state))
states |= state
settings.set('QUEUE_STATUS_FILTER', states)
return settings
def process_queue(settings, config):
settings = process_queue_tx(settings, config)
settings = process_queue_paths(settings, config)
if config.get('QUEUE_BACKEND') == 'fs':
settings = process_queue_backend_fs(settings, config)
settings = process_queue_backend(settings, config)
settings = process_queue_store(settings, config)
settings = process_queue_status_filter(settings, config)
return settings
def process_settings(settings, config):
super(ChainqueueSettings, settings).process(config)
settings = settings.process_queue(settings, config)
return settings

140
chainqueue/state.py Normal file
View File

@@ -0,0 +1,140 @@
# standard imports
import logging
# external imports
import shep.persist
logg = logging.getLogger(__name__)
class Verify:
def verify(self, state_store, key, from_state, to_state):
to_state_name = state_store.name(to_state)
m = None
try:
m = getattr(self, to_state_name)
except AttributeError:
return None
r = m(state_store, from_state)
if r != None:
from_state_name = state_store.name(from_state)
r = '{} -> {}: {}'.format(from_state_name, to_state_name, r)
return r
def INSUFFICIENT_FUNDS(self, state_store, from_state):
if from_state & state_store.FINAL:
return 'already finalized'
if from_state & state_store.IN_NETWORK:
return 'already in network'
def UNKNOWN_ERROR(self, state_store, from_state):
if from_state & state_store.FINAL:
return 'already finalized'
if from_state & state_store.RESERVED:
return 'not reserved'
if from_state & state_store.mask_error:
return 'already in error state'
def NODE_ERROR(self, state_store, from_state):
if from_state & state_store.FINAL:
return 'already finalized'
if from_state & state_store.IN_NETWORK:
return 'already in network'
if not from_state & state_store.RESERVED:
return 'not reserved'
if from_state & state_store.mask_error:
return 'already in error state'
def NETWORK_ERROR(self, state_store, from_state):
if from_state & state_store.FINAL:
return 'already finalized'
if from_state & state_store.IN_NETWORK:
return 'already in network'
def OBSOLETE(self, state_store, from_state):
if from_state & state_store.FINAL:
return 'already finalized'
if from_state & state_store.IN_NETWORK:
return 'already in network'
if from_state & state_store.OBSOLETE:
return 'already obsolete'
def MANUAL(self, state_store, from_state):
if from_state & state_store.FINAL:
return 'already finalized'
def QUEUED(self, state_store, from_state):
if from_state & state_store.FINAL:
return 'already finalized'
if from_state & state_store.IN_NETWORK:
if not from_state & state_store.mask_error:
return 'not in error state'
elif from_state & state_store.mask_error:
return 'no first send on error state'
def SENDFAIL(self, state_store, from_state):
return self.NODE_ERROR(state_store, from_state)
def FINAL(self, state_store, from_state):
if from_state & state_store.FINAL:
return 'already finalized'
def _MINEFAIL(self, state_store, from_state):
return self.NETWORK_ERROR(state_store, from_state)
def _CANCEL(self, state_store, from_state):
if from_state:
if from_state & state_store.FINAL:
return 'already finalized'
if not from_state & (state_store.OBSOLETE | state_store.IN_NETWORK):
return 'can only cancel state having OBSOLETE and/or IN_NETWORK'
class Status(shep.persist.PersistedState):
bits = 12
def __init__(self, store_factory, allow_invalid=False, event_callback=None):
verify = Verify().verify
self.set_default_state('PENDING')
super(Status, self).__init__(store_factory, self.bits, verifier=verify, check_alias=not allow_invalid, event_callback=event_callback)
self.add('QUEUED')
self.add('RESERVED')
self.add('IN_NETWORK')
self.add('DEFERRED')
self.add('INSUFFICIENT_FUNDS')
self.add('LOCAL_ERROR')
self.add('NODE_ERROR')
self.add('NETWORK_ERROR')
self.add('UNKNOWN_ERROR')
self.add('FINAL')
self.add('OBSOLETE')
self.add('MANUAL')
self.alias('SENDFAIL', self.DEFERRED | self.LOCAL_ERROR)
self.alias('RETRY', self.DEFERRED | self.QUEUED)
self.alias('OBSOLETED', self.OBSOLETE | self.IN_NETWORK)
self.alias('FUBAR', self.FINAL | self.UNKNOWN_ERROR)
self.alias('CANCELLED', self.IN_NETWORK | self.FINAL | self.OBSOLETE)
self.alias('OVERRIDDEN', self.FINAL | self.OBSOLETE | self.MANUAL)
self.alias('REJECTED', self.NODE_ERROR | self.FINAL)
self.alias('REVERTED', self.IN_NETWORK | self.FINAL | self.NETWORK_ERROR)
self.alias('SUCCESS', self.IN_NETWORK | self.FINAL)
self.alias('_MINEFAIL', self.FINAL | self.NETWORK_ERROR)
self.alias('_CANCEL', self.FINAL | self.OBSOLETE)
self.mask_error = self.LOCAL_ERROR | self.NODE_ERROR | self.NETWORK_ERROR | self.UNKNOWN_ERROR

View File

@@ -0,0 +1,5 @@
from .base import (
to_key,
from_key,
Store,
)

218
chainqueue/store/base.py Normal file
View File

@@ -0,0 +1,218 @@
# standard imports
import re
import datetime
import logging
import time
# local imports
from chainqueue.cache import CacheTx
from chainqueue.entry import QueueEntry
from chainqueue.error import NotLocalTxError
from chainqueue.enum import (
StatusBits,
all_errors,
)
logg = logging.getLogger(__name__)
def to_key(t, n, k):
return '{}_{}_{}'.format(t, n, k)
def from_key(k):
(ts_str, seq_str, tx_hash) = k.split('_')
return (float(ts_str), int(seq_str), tx_hash, )
all_local_errors = all_errors() - StatusBits.NETWORK_ERROR
re_u = r'^[^_][_A-Z]+$'
class Store:
def __init__(self, chain_spec, state_store, index_store, counter, cache=None, sync=True):
self.chain_spec = chain_spec
self.cache = cache
self.state_store = state_store
self.index_store = index_store
self.counter = counter
for s in dir(self.state_store):
if not re.match(re_u, s):
continue
v = self.state_store.from_name(s)
setattr(self, s, v)
for v in [
'state',
'change',
'set',
'unset',
'name',
'modified',
'purge',
]:
setattr(self, v, getattr(self.state_store, v))
if not sync:
return
sync_err = None
try:
self.state_store.sync()
except Exception as e:
sync_err = e
if sync_err != None:
raise FileNotFoundError(sync_err)
def put(self, v, cache_adapter=CacheTx):
tx = cache_adapter(self.chain_spec)
tx.deserialize(v)
k = tx.hash
n = self.counter.next()
t = datetime.datetime.now().timestamp()
s = to_key(t, n, k)
self.index_store.put(k, s)
self.state_store.put(s, v)
if self.cache != None:
self.cache.put(self.chain_spec, tx)
return (s, k,)
def get(self, k):
v = None
s = self.index_store.get(k)
err = None
try:
v = self.state_store.get(s)
except FileNotFoundError as e:
err = e
if v == None:
raise NotLocalTxError('could not find tx {}: {}'.format(k, err))
return (s, v,)
def by_state(self, state=0, not_state=0, include_pending=False, limit=4096, strict=False, threshold=None):
hashes = []
i = 0
refs_state = []
if state > 0:
if self.state_store.is_pure(state):
refs_state = self.state_store.list(state)
elif strict:
refs_state = self.state_store.list(state)
else:
for v in self.state_store.elements(state, numeric=True):
refs_state += self.state_store.list(v)
refs_state = list(set(refs_state))
if include_pending:
refs_state += self.state_store.list(0)
refs_state.sort()
for ref in refs_state:
v = from_key(ref)
hsh = v[2]
item_state = self.state_store.state(ref)
if strict:
if item_state & state != item_state:
continue
if item_state & not_state > 0:
continue
item_state_str = self.state_store.name(item_state)
logg.info('state {} {} ({})'.format(ref, item_state_str, item_state))
if threshold != None:
v = self.state_store.modified(ref)
if v > threshold:
continue
hashes.append(hsh)
i += 1
if limit > 0 and i == limit:
break
#hashes.sort()
return hashes
def upcoming(self, limit=4096):
return self.by_state(state=self.QUEUED, limit=limit)
def deferred(self, limit=4096, threshold=None):
return self.by_state(state=self.DEFERRED, limit=limit, threshold=threshold)
def failed(self, limit=4096):
#return self.by_state(state=all_local_errors, limit=limit)
r = []
r += self.by_state(state=self.LOCAL_ERROR, limit=limit)
r += self.by_state(state=self.NODE_ERROR, limit=limit)
r.sort()
if len(r) > limit:
r = r[:limit]
return r
def pending(self, limit=4096):
return self.by_state(include_pending=True, limit=limit, strict=True)
def reserve(self, k):
entry = QueueEntry(self, k)
entry.load()
entry.reserve()
def enqueue(self, k):
entry = QueueEntry(self, k)
entry.load()
try:
entry.retry()
except StateTransitionInvalid:
entry.readysend()
def fail(self, k):
entry = QueueEntry(self, k)
entry.load()
logg.debug('fail {}'.format(k))
entry.sendfail()
def final(self, k, block, tx, error=False):
entry = QueueEntry(self, k)
entry.load()
if error:
entry.fail(block, tx)
else:
entry.succeed(block, tx)
def send_start(self, k):
entry = QueueEntry(self, k)
entry.load()
entry.reserve()
return entry
def send_end(self, k):
entry = QueueEntry(self, k)
entry.load()
entry.sent()
def is_reserved(self, k):
entry = QueueEntry(self, k)
entry.load()
return entry.test(self.RESERVED)
def sync(self):
self.state_store.sync()

94
chainqueue/store/fs.py Normal file
View File

@@ -0,0 +1,94 @@
# standard imports
import os
import logging
# external imports
from leveldir.hex import HexDir
# local imports
from chainqueue.error import (
DuplicateTxError,
NotLocalTxError,
)
logg = logging.getLogger(__name__)
class IndexStore(HexDir):
def __init__(self, root_path, digest_bytes=32):
os.path.join(root_path, 'contents')
self.store = HexDir(root_path, digest_bytes)
def __exists(self, k):
existing = None
try:
existing = self.get(k)
except NotLocalTxError:
pass
return existing != None
def put(self, k, v):
kb = bytes.fromhex(k)
vb = v.encode('utf-8')
if self.__exists(k):
raise DuplicateTxError(k)
self.store.add(kb, vb)
def get(self, k):
fp = self.store.to_filepath(k)
f = None
err = None
try:
f = open(fp, 'rb')
except FileNotFoundError as e:
err = e
if err != None:
raise NotLocalTxError(err)
v = f.read()
f.close()
return v.decode('utf-8')
class CounterStore:
def __init__(self, root_path):
try:
os.stat(root_path)
except FileNotFoundError:
os.makedirs(root_path)
fp = os.path.join(root_path, '.counter')
f = None
try:
f = open(fp, 'rb+')
except FileNotFoundError:
logg.debug('counter not found, creating new in {}'.format(fp))
f = open(fp, 'wb+')
f.write(b'\x00' * 8)
f.close()
f = open(fp, 'rb+')
v = f.read(8)
self.count = int.from_bytes(v, byteorder='big')
logg.debug('counter starts at {}'.format(self.count))
f.seek(0)
self.f = f
def __del__(self):
self.f.close()
def next(self):
c = self.count
self.count += 1
v = self.count.to_bytes(8, 'big')
self.f.write(v)
self.f.seek(0)
return c

View File

@@ -1,8 +1,5 @@
pysha3==1.0.2
hexathon~=0.1.0
hexathon~=0.1.7
leveldir~=0.3.0
alembic==1.4.2
SQLAlchemy==1.3.20
confini~=0.5.1
pyxdg~=0.27
chainlib~=0.0.12
confini~=0.6.1
chainlib~=0.4.0
shep~=0.3.0

View File

@@ -1,16 +1,14 @@
[metadata]
name = chainqueue
version = 0.0.6rc3
version = 0.2.2
description = Generic blockchain transaction queue control
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/chaintools/chainqueue
url = https://git.defalslfy.org/chainqueue,git
keywords =
cic
dlt
cryptocurrency
ethereum
solidarity
mutual_credit
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
@@ -20,22 +18,23 @@ classifiers =
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
# Topic :: Blockchain :: EVM
license = GPL3
license = OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)
licence_files =
LICENSE.txt
LICENSE
[options]
python_requires = >= 3.6
python_requires = >= 3.7
include_package_data = True
packages =
chainqueue
chainqueue.db
chainqueue.db.models
chainqueue.sql
chainqueue.adapters
chainqueue.cache
chainqueue.unittest
chainqueue.store
chainqueue.runnable
chainqueue.cli
chainqueue.data
[options.entry_points]
console_scripts =
chainqueue-list = chainqueue.runnable.list:main
chainqueue-state = chainqueue.runnable.state:main

39
tests/base_shep.py Normal file
View File

@@ -0,0 +1,39 @@
# standard imports
import tempfile
import unittest
import shutil
import logging
# external imports
from shep.store.file import SimpleFileStoreFactory
from chainlib.chain import ChainSpec
# local imports
from chainqueue import (
Store,
Status,
)
# test imports
from tests.common import (
MockCounter,
MockContentStore,
)
logg = logging.getLogger(__name__)
class TestShepBase(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
factory = SimpleFileStoreFactory(self.path).add
self.state = Status(factory)
content_store = MockContentStore()
counter = MockCounter()
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.store = Store(chain_spec, self.state, content_store, counter)
logg.debug('using path {}'.format(self.path))
def tearDown(self):
shutil.rmtree(self.path)

103
tests/common.py Normal file
View File

@@ -0,0 +1,103 @@
# standard imports
import hashlib
# local imports
from chainqueue.cache import (
Cache,
CacheTokenTx,
)
class MockCounter:
def __init__(self):
self.c = 0
def next(self):
c = self.c
self.c += 1
return c
class MockTokenCache(Cache):
def __init__(self):
self.db = {}
self.last_filter = None
def put(self, chain_spec, cache_tx):
self.db[cache_tx.hash] = cache_tx
def get(self, chain_spec, tx_hash):
return self.db[tx_hash]
def by_nonce(self, cache_filter):
self.last_filter = cache_filter
def by_date(self, cache_filter=None):
self.last_filter = cache_filter
def count(self, cache_filter):
self.last_filter = cache_filter
class MockCacheTokenTx(CacheTokenTx):
def deserialize(self, signed_tx):
h = hashlib.sha1()
try:
h.update(signed_tx + b'\x01')
except TypeError:
h.update(signed_tx.encode('utf-8') + b'\x01')
z = h.digest()
nonce = int.from_bytes(z[:4], 'big')
token_value = int.from_bytes(z[4:8], 'big')
value = int.from_bytes(z[8:12], 'big')
h = hashlib.sha1()
h.update(z)
z = h.digest()
sender = z.hex()
h = hashlib.sha1()
h.update(z)
z = h.digest()
recipient = z.hex()
h = hashlib.sha1()
h.update(z)
z = h.digest()
token = z.hex()
h = hashlib.sha256()
h.update(z)
z = h.digest()
tx_hash = z.hex()
self.init(tx_hash, nonce, sender, recipient, value)
self.set('src_token', token)
self.set('dst_token', token)
self.set('src_value', token_value)
self.set('dst_value', token_value)
self.confirm(42, 13, 1024000)
return self
class MockContentStore:
def __init__(self):
self.store = {}
def put(self, k, v):
self.store[k] = v
def get(self, k):
return self.store.get(k)

95
tests/test_cache.py Normal file
View File

@@ -0,0 +1,95 @@
# standard imports
import os
import logging
import unittest
import math
# external imports
from hexathon import add_0x
from chainlib.chain import ChainSpec
# local imports
from chainqueue import QueueEntry
from chainqueue.cache import (
CacheTokenTx,
CacheFilter,
)
# test imports
from tests.base_shep import TestShepBase
from tests.common import (
MockTokenCache,
MockCacheTokenTx,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class MockNormalizer:
def address(self, v):
return 'address' + v
def value(self, v):
dv = int(math.log10(v) + 1)
return float(v / (10 ** dv))
def hash(self, v):
return 'ashbashhash' + v
class TestCache(TestShepBase):
def setUp(self):
super(TestCache, self).setUp()
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.cache = MockTokenCache()
def test_cache_instance(self):
normalizer = MockNormalizer()
a = b'foo'
tx = MockCacheTokenTx(self.chain_spec, normalizer=normalizer)
tx.deserialize(a)
self.assertTrue(isinstance(tx.value, float))
self.assertEqual(tx.sender[:4], 'addr')
self.assertEqual(tx.recipient[:4], 'addr')
self.assertEqual(tx.hash[:11], 'ashbashhash')
def test_cache_putget(self):
a = b'foo'
tx = MockCacheTokenTx(self.chain_spec)
tx.deserialize(a)
self.cache.put(self.chain_spec, tx)
tx_retrieved = self.cache.get(self.chain_spec, tx.hash)
self.assertEqual(tx, tx_retrieved)
def test_cache_filter(self):
normalizer = MockNormalizer()
fltr = CacheFilter(normalizer=normalizer)
sender = os.urandom(20).hex()
fltr.add_senders(sender)
recipient_one = os.urandom(20).hex()
recipient_two = os.urandom(20).hex()
fltr.add_recipients([recipient_one, recipient_two])
self.assertEqual(fltr.senders[0][:4], 'addr')
self.assertEqual(fltr.recipients[1][:4], 'addr')
def test_cache_query(self):
a = os.urandom(20).hex()
fltr = CacheFilter(nonce=42)
self.cache.count(fltr)
self.assertEqual(self.cache.last_filter, fltr)
if __name__ == '__main__':
unittest.main()

85
tests/test_entry.py Normal file
View File

@@ -0,0 +1,85 @@
# standard imports
import os
import logging
import unittest
# external imports
from hexathon import add_0x
from chainlib.tx import Tx
from chainlib.block import Block
# local imports
from chainqueue import QueueEntry
# test imports
from tests.base_shep import TestShepBase
from tests.common import MockCacheTokenTx
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestEntry(TestShepBase):
def test_entry_get(self):
signed_tx = add_0x(os.urandom(128).hex())
nonce = 42
entry = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
tx_hash_one = entry.create(signed_tx)
signed_tx = add_0x(os.urandom(128).hex())
nonce = 42
entry = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
tx_hash_two = entry.create(signed_tx)
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 2)
logg.debug('tx hash one {}'.format(tx_hash_one))
entry = QueueEntry(self.store, tx_hash=tx_hash_one, cache_adapter=MockCacheTokenTx)
entry.load()
entry.sent()
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 1)
txs = self.store.by_state(state=self.store.IN_NETWORK)
self.assertEqual(len(txs), 1)
entry.succeed(None, None)
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 1)
entry = QueueEntry(self.store, tx_hash_two)
entry.load()
entry.sent()
txs = self.store.by_state(state=self.store.IN_NETWORK)
self.assertEqual(len(txs), 2)
txs = self.store.by_state(state=self.store.IN_NETWORK, strict=True)
self.assertEqual(len(txs), 1)
def test_entry_change(self):
signed_tx = add_0x(os.urandom(128).hex())
nonce = 42
entry = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
tx_hash = entry.create(signed_tx)
block = Block()
block.number = 13
tx = Tx(None)
tx.index = 666
entry.readysend()
entry.reserve()
entry.sendfail()
entry = QueueEntry(self.store, tx_hash, cache_adapter=MockCacheTokenTx)
entry.load()
self.assertEqual(str(entry.tx_hash), tx_hash)
if __name__ == '__main__':
unittest.main()

119
tests/test_integrate.py Normal file
View File

@@ -0,0 +1,119 @@
# standard imports
import os
import tempfile
import unittest
import logging
import time
# external imports
from shep.store.file import SimpleFileStoreFactory
from chainlib.chain import ChainSpec
# local imports
from chainqueue import (
Store,
Status,
)
# test imports
from tests.common import (
MockCounter,
MockTokenCache,
MockCacheTokenTx,
MockContentStore,
)
from tests.base_shep import TestShepBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestIntegrateBase(TestShepBase):
def setUp(self):
self.path = tempfile.mkdtemp()
factory = SimpleFileStoreFactory(self.path).add
self.state = Status(factory)
content_store = MockContentStore()
counter = MockCounter()
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.cache = MockTokenCache()
self.store = Store(chain_spec, self.state, content_store, counter, cache=self.cache)
def test_integration_valid(self):
self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
def test_state_default(self):
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
v = self.store.pending()
self.assertEqual(len(v), 1)
self.assertEqual(v[0], hx)
def test_state_enqueue(self):
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.get(hx)
self.store.enqueue(hx)
v = self.store.upcoming()
self.assertEqual(len(v), 1)
v = self.store.pending()
self.assertEqual(len(v), 0)
def test_state_defer(self):
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.reserve(hx)
self.store.fail(hx)
v = self.store.deferred()
self.assertEqual(len(v), 1)
self.assertEqual(v[0], hx)
def test_state_multiple(self):
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.reserve(hx)
self.store.fail(hx)
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.reserve(hx)
self.store.fail(hx)
v = self.store.deferred()
self.assertEqual(len(v), 2)
def test_state_multiple_sort(self):
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.reserve(hx)
self.store.fail(hx)
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.enqueue(hx)
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.reserve(hx)
self.store.fail(hx)
self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
v = self.store.deferred()
self.assertEqual(len(v), 2)
def test_state_date_threshold(self):
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.reserve(hx)
self.store.fail(hx)
then = self.store.modified(s)
time.sleep(0.1)
(s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.reserve(hx)
self.store.fail(hx)
v = self.store.deferred(threshold=then)
self.assertEqual(len(v), 1)
if __name__ == '__main__':
unittest.main()

60
tests/test_shep.py Normal file
View File

@@ -0,0 +1,60 @@
# standard imports
import os
import logging
import unittest
# external imports
from hexathon import (
add_0x,
strip_0x,
)
from shep.error import StateTransitionInvalid
# local imports
from chainqueue import QueueEntry
# test imports
from tests.base_shep import TestShepBase
from tests.common import MockCacheTokenTx
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestShep(TestShepBase):
def test_shep_setup(self):
pass
def test_shep_tx(self):
signed_tx = add_0x(os.urandom(128).hex())
nonce = 42
tx = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
tx_hash = tx.create(signed_tx)
tx_retrieved = QueueEntry(self.store, tx_hash=tx_hash)
tx_retrieved.load()
self.assertEqual(tx_retrieved.signed_tx, strip_0x(signed_tx))
def test_shep_valid(self):
self.state.put('foo', 'bar')
self.state.set('foo', self.state.IN_NETWORK)
self.state.set('foo', self.state.FINAL)
def test_shep_invalid(self):
self.state.put('foo', 'bar')
self.state.set('foo', self.state.FINAL)
with self.assertRaises(StateTransitionInvalid):
self.state.move('foo', self.state.INSUFFICIENT_FUNDS)
def test_shep_cache(self):
self.store.put('bar', cache_adapter=MockCacheTokenTx)
if __name__ == '__main__':
unittest.main()

104
tests/test_store.py Normal file
View File

@@ -0,0 +1,104 @@
# standard imports
import os
import tempfile
import unittest
import logging
import shutil
# external imports
from chainlib.chain import ChainSpec
from shep.store.noop import NoopStoreFactory
# local imports
from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
from chainqueue.store.base import Store
from chainqueue.error import DuplicateTxError
from chainqueue.state import Status
# tests imports
from tests.common import (
MockTokenCache,
MockCacheTokenTx,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestStoreImplementations(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.path)
def test_basic_index(self):
store = IndexStore(self.path)
hx = os.urandom(32).hex()
data = 'foo_bar_baz'
store.put(hx, data)
r = store.get(hx)
self.assertEqual(data, r)
def test_basic_counter(self):
store = CounterStore(self.path)
v = store.next()
self.assertEqual(v, 0)
v = store.next()
self.assertEqual(v, 1)
store = CounterStore(self.path)
v = store.next()
self.assertEqual(v, 2)
def test_duplicate(self):
store = IndexStore(self.path)
hx = os.urandom(32).hex()
data = 'foo_bar_baz'
store.put(hx, data)
with self.assertRaises(DuplicateTxError):
store.put(hx, data)
def test_upcoming_limit(self):
index_store = IndexStore(self.path)
counter_store = CounterStore(self.path)
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
factory = NoopStoreFactory().add
state_store = Status(factory)
cache_store = MockTokenCache()
queue_store = Store(chain_spec, state_store, index_store, counter_store, cache=cache_store)
txs = []
for i in range(3):
tx_src = os.urandom(128).hex()
tx = queue_store.put(tx_src, cache_adapter=MockCacheTokenTx)
txs.append(tx)
r = queue_store.upcoming(limit=3)
self.assertEqual(len(r), 0)
for tx in txs:
queue_store.enqueue(tx[1])
r = queue_store.upcoming(limit=3)
self.assertEqual(len(r), 3)
queue_store.send_start(txs[0][1])
r = queue_store.upcoming(limit=3)
self.assertEqual(len(r), 2)
queue_store.send_end(txs[0][1])
r = queue_store.upcoming(limit=3)
self.assertEqual(len(r), 2)
if __name__ == '__main__':
unittest.main()