chaind/chaind/adapters/fs.py

78 lines
2.0 KiB
Python
Raw Normal View History

2022-03-14 22:17:31 +01:00
# standard imports
import logging
2022-03-13 15:54:11 +01:00
# external imports
2022-03-14 20:53:29 +01:00
from chainlib.error import RPCException
2022-03-14 22:40:20 +01:00
from chainqueue import Status
2022-03-13 15:54:11 +01:00
from chainqueue.cache import Cache
from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
from shep.store.file import SimpleFileStoreFactory
2022-03-14 22:40:20 +01:00
# local imports
from .base import ChaindAdapter
2022-03-14 22:17:31 +01:00
logg = logging.getLogger(__name__)
2022-03-13 15:54:11 +01:00
2022-03-14 22:40:20 +01:00
class ChaindFsAdapter(ChaindAdapter):
2022-03-13 15:54:11 +01:00
2022-03-14 20:53:29 +01:00
def __init__(self, chain_spec, path, deserializer, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32):
2022-03-13 15:54:11 +01:00
factory = SimpleFileStoreFactory(path).add
state_store = Status(factory)
index_store = IndexStore(path, digest_bytes=digest_bytes)
counter_store = CounterStore(path)
2022-03-14 22:40:20 +01:00
super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, deserializer, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0)
2022-03-13 15:54:11 +01:00
2022-03-14 20:53:29 +01:00
def put(self, signed_tx):
2022-03-13 15:54:11 +01:00
cache_tx = self.deserialize(signed_tx)
2022-03-14 22:17:31 +01:00
self.store.put(cache_tx.hash, signed_tx)
return cache_tx.hash
2022-03-13 15:54:11 +01:00
def get(self, tx_hash):
v = self.store.get(tx_hash)
return v[1]
2022-03-14 20:53:29 +01:00
def upcoming(self):
return self.store.upcoming()
def pending(self):
return self.store.pending()
def deferred(self):
return self.store.deferred()
2022-03-14 22:17:31 +01:00
def succeed(self, block, tx):
return self.store.final(tx.hash, block, tx, error=False)
def fail(self, block, tx):
return self.store.final(tx.hash, block, tx, error=True)
2022-03-14 20:53:29 +01:00
def enqueue(self, tx_hash):
return self.store.enqueue(tx_hash)
def dispatch(self, tx_hash):
entry = self.store.send_start(tx_hash)
tx_wire = entry.serialize()
r = None
try:
r = self.dispatcher.send(tx_wire)
except RPCException:
self.store.fail(tx_hash)
return False
self.store.send_end(tx_hash)
return True