chaind/chaind/adapters/fs.py

158 lines
5.1 KiB
Python
Raw Permalink Normal View History

2022-03-14 22:17:31 +01:00
# standard imports
import logging
2022-04-28 14:41:03 +02:00
import os
2022-05-01 09:55:51 +02:00
import time
2022-03-14 22:17:31 +01:00
2022-03-13 15:54:11 +01:00
# external imports
2022-03-14 20:53:29 +01:00
from chainlib.error import RPCException
2022-03-14 22:40:20 +01:00
from chainqueue import Status
2022-03-13 15:54:11 +01:00
from chainqueue.cache import Cache
from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
from shep.store.file import SimpleFileStoreFactory
2022-05-02 11:59:13 +02:00
from shep.error import (
StateInvalid,
StateLockedKey,
)
2022-03-13 15:54:11 +01:00
2022-03-14 22:40:20 +01:00
# local imports
from .base import ChaindAdapter
2022-05-02 11:59:13 +02:00
from chaind.lock import StoreLock
2022-03-14 22:40:20 +01:00
2022-03-14 22:17:31 +01:00
logg = logging.getLogger(__name__)
2022-03-13 15:54:11 +01:00
2022-03-14 22:40:20 +01:00
class ChaindFsAdapter(ChaindAdapter):
2022-03-13 15:54:11 +01:00
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None, store_sync=True):
2022-05-02 11:59:13 +02:00
factory = SimpleFileStoreFactory(path, use_lock=True).add
2022-04-30 07:45:02 +02:00
state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
2022-04-28 14:41:03 +02:00
index_path = os.path.join(path, 'tx')
index_store = IndexStore(index_path, digest_bytes=digest_bytes)
2022-03-13 15:54:11 +01:00
counter_store = CounterStore(path)
super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold, store_sync=store_sync)
2022-03-13 15:54:11 +01:00
2022-03-14 20:53:29 +01:00
def put(self, signed_tx):
(s, tx_hash,) = self.store.put(signed_tx, cache_adapter=self.cache_adapter)
return tx_hash
2022-03-13 15:54:11 +01:00
def get(self, tx_hash):
2022-04-28 14:41:03 +02:00
v = None
2022-05-02 11:59:13 +02:00
store_lock = StoreLock()
while True:
try:
v = self.store.get(tx_hash)
2022-05-01 09:55:51 +02:00
break
except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None
except FileNotFoundError as e:
2022-05-04 20:22:29 +02:00
logg.debug('queuestore get (file missing) {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
2022-05-02 11:59:13 +02:00
store_lock.again()
continue
except StateLockedKey as e:
2022-05-04 20:22:29 +02:00
logg.debug('queuestore get (statelock) {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
2022-05-02 11:59:13 +02:00
store_lock.again()
continue
2022-05-02 11:59:13 +02:00
2022-03-13 15:54:11 +01:00
return v[1]
2022-03-14 20:53:29 +01:00
2022-04-30 07:45:02 +02:00
def upcoming(self, limit=0):
real_limit = 0
in_flight = []
2022-04-30 07:45:02 +02:00
if limit > 0:
in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
real_limit = limit - len(in_flight)
if real_limit <= 0:
2022-04-30 07:45:02 +02:00
return []
r = self.store.upcoming(limit=real_limit)
logg.info('upcoming returning {} upcoming from limit {} less {} active in-flight txs'.format(len(r), limit, len(in_flight)))
return r
2022-03-14 20:53:29 +01:00
def pending(self):
return self.store.pending()
def deferred(self):
return self.store.deferred()
2022-04-30 07:45:02 +02:00
def failed(self):
return self.store.failed()
2022-03-14 22:17:31 +01:00
def succeed(self, block, tx):
2022-04-29 08:26:43 +02:00
if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
2022-05-02 22:05:41 +02:00
r = self.store.final(tx.hash, block, tx, error=False)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
2022-05-02 22:05:41 +02:00
return r
2022-03-14 22:17:31 +01:00
def fail(self, block, tx):
2022-05-02 22:05:41 +02:00
if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
r = self.store.final(tx.hash, block, tx, error=True)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
2022-05-02 22:05:41 +02:00
return r
2022-03-14 22:17:31 +01:00
2022-04-30 07:45:02 +02:00
def sendfail(self):
return self.store.fail(tx.hash)
2022-03-14 20:53:29 +01:00
def enqueue(self, tx_hash):
return self.store.enqueue(tx_hash)
def dispatch(self, tx_hash):
2022-05-01 09:55:51 +02:00
entry = None
2022-05-02 11:59:13 +02:00
store_lock = StoreLock()
while True:
2022-05-01 09:55:51 +02:00
try:
entry = self.store.send_start(tx_hash)
break
except FileNotFoundError as e:
2022-05-02 11:59:13 +02:00
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
2022-05-01 09:55:51 +02:00
continue
2022-03-14 20:53:29 +01:00
tx_wire = entry.serialize()
r = None
try:
r = self.dispatcher.send(tx_wire)
except RPCException as e:
logg.error('dispatch send failed for {}: {}'.format(tx_hash, e))
2022-03-14 20:53:29 +01:00
self.store.fail(tx_hash)
return False
2022-05-02 11:59:13 +02:00
store_lock = StoreLock()
while True:
try:
self.store.send_end(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
2022-03-14 20:53:29 +01:00
return True