WIP adapt to chainqueue shep

This commit is contained in:
lash 2022-03-13 14:54:11 +00:00
parent 7169142594
commit 9c641892ab
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
7 changed files with 92 additions and 55 deletions

View File

@ -1 +1 @@
from .setup import Environment #from .setup import Environment

33
chaind/adapters/new.py Normal file
View File

@ -0,0 +1,33 @@
# external imports
from chainqueue import (
Status,
Store as QueueStore,
)
from chainqueue.cache import Cache
from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
from shep.store.file import SimpleFileStoreFactory
class ChaindFsAdapter:
def __init__(self, chain_spec, path, deserializer, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32):
factory = SimpleFileStoreFactory(path).add
state_store = Status(factory)
index_store = IndexStore(path, digest_bytes=digest_bytes)
counter_store = CounterStore(path)
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
self.deserialize = deserializer
def add(self, signed_tx):
cache_tx = self.deserialize(signed_tx)
self.store.put(cache_tx.tx_hash, signed_tx)
return cache_tx.tx_hash
def get(self, tx_hash):
v = self.store.get(tx_hash)
return v[1]

View File

@ -1,47 +0,0 @@
# standard imports
import datetime
# external imports
from hexathon import (
add_0x,
strip_0x,
)
from chainqueue.adapters.base import Adapter
from chainqueue.enum import StatusBits
class SessionIndexAdapter(Adapter):
def __init__(self, backend, session_index_backend=None, pending_retry_threshold=0, error_retry_threshold=0):
super(SessionIndexAdapter, self).__init__(backend, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold)
self.session_index_backend = session_index_backend
def add(self, tx_raw_signed_hex, chain_spec, session=None):
bytecode = bytes.fromhex(strip_0x(tx_raw_signed_hex))
tx = self.translate(bytecode, chain_spec)
r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
if r:
session.rollback()
session.close()
return None
r = self.backend.cache(tx, session=session)
if self.session_index_backend != None:
session.flush()
self.session_index_backend.add(chain_spec, tx['hash'], session=session)
session.commit()
return tx['hash']
def upcoming(self, chain_spec, session=None):
txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK)
before = datetime.datetime.utcnow() - self.error_retry_threshold
errored_txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.LOCAL_ERROR, not_status=StatusBits.FINAL, before=before, requeue=True)
for tx_hash in errored_txs.keys():
txs[tx_hash] = errored_txs[tx_hash]
return txs
def get(self, tx_hash, chain_spec, session=None):
tx_summary = self.backend.get_otx(chain_spec, tx_hash, session=session)
return tx_summary['signed_tx']

View File

@ -1,6 +1,6 @@
chainlib>=0.0.9a7,<=0.1.0 chainlib>=0.1.0b1,<=0.1.0
chainqueue>=0.0.5a3,<=0.0.5 #chainqueue~=0.2.0
chainsyncer>=0.0.6a3,<=0.0.6 #chainsyncer~=0.0.7
confini>=0.4.1a1,<0.5.0 confini~=0.6.0
crypto-dev-signer>=0.4.15a3,<0.5.0 funga~=0.5.2
pyxdg~=0.26 pyxdg~=0.26

View File

@ -26,7 +26,7 @@ env = Environment(env=os.environ)
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=env.config_dir, help='config directory') argparser.add_argument('-c', type=str, default=env.config_dir, help='config directory')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--data-dir', dest='data_dir', type=str, default=env.data_dir, help='data directory') argparser.add_argument('--data-dir', dest='data_dir', type=str, help='data directory')
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=default_migrations_dir, type=str, help='path to alembic migrations directory') argparser.add_argument('--migrations-dir', dest='migrations_dir', default=default_migrations_dir, type=str, help='path to alembic migrations directory')
argparser.add_argument('--reset', action='store_true', help='reset exsting database') argparser.add_argument('--reset', action='store_true', help='reset exsting database')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-v', action='store_true', help='be verbose')

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chaind name = chaind
version = 0.0.3a5 version = 0.1.0
description = Base package for chain queue servicek description = Base package for chain queue servicek
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

51
tests/test_fs.py Normal file
View File

@ -0,0 +1,51 @@
# standard imports
import os
import tempfile
import unittest
import shutil
import logging
import hashlib
# external imports
from chainlib.chain import ChainSpec
from chainqueue.cache import CacheTokenTx
# local imports
from chaind.adapters.new import ChaindFsAdapter
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class MockCacheAdapter(CacheTokenTx):
def deserialize(self, v):
tx = CacheTokenTx()
h = hashlib.sha256()
h.update(v.encode('utf-8'))
z = h.digest()
tx.tx_hash = z.hex()
return tx
class TestChaindFs(unittest.TestCase):
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.path = tempfile.mkdtemp()
self.adapter = ChaindFsAdapter(self.chain_spec, self.path, MockCacheAdapter().deserialize)
def tearDown(self):
shutil.rmtree(self.path)
def test_fs_setup(self):
data = os.urandom(128).hex()
hsh = self.adapter.add(data)
v = self.adapter.get(hsh)
self.assertEqual(data, v)
if __name__ == '__main__':
unittest.main()