Factor out dispatch processor from chain implementation
This commit is contained in:
		
							parent
							
								
									4f96be2024
								
							
						
					
					
						commit
						9cffdc5867
					
				@ -13,13 +13,13 @@ logg = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
class ChaindAdapter:
 | 
			
		||||
 | 
			
		||||
    def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0):
 | 
			
		||||
    def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, store_sync=True):
 | 
			
		||||
        self.cache_adapter = cache_adapter
 | 
			
		||||
        self.dispatcher = dispatcher
 | 
			
		||||
        store_lock = StoreLock()
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
 | 
			
		||||
                self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache, sync=store_sync)
 | 
			
		||||
                break
 | 
			
		||||
            except FileNotFoundError as e:
 | 
			
		||||
                logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e))
 | 
			
		||||
 | 
			
		||||
@ -26,13 +26,13 @@ logg = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
class ChaindFsAdapter(ChaindAdapter):
 | 
			
		||||
 | 
			
		||||
    def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None):
 | 
			
		||||
    def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None, store_sync=True):
 | 
			
		||||
        factory = SimpleFileStoreFactory(path, use_lock=True).add
 | 
			
		||||
        state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
 | 
			
		||||
        index_path = os.path.join(path, 'tx')
 | 
			
		||||
        index_store = IndexStore(index_path, digest_bytes=digest_bytes)
 | 
			
		||||
        counter_store = CounterStore(path)
 | 
			
		||||
        super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold)
 | 
			
		||||
        super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold, store_sync=store_sync)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def put(self, signed_tx):
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										33
									
								
								chaind/dispatch.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								chaind/dispatch.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,33 @@
 | 
			
		||||
# standard imports
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
# local ipmorts
 | 
			
		||||
from chaind.adapters.fs import ChaindFsAdapter
 | 
			
		||||
from chaind.eth.cache import EthCacheTx
 | 
			
		||||
 | 
			
		||||
logg = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DispatchProcessor:
 | 
			
		||||
 | 
			
		||||
    def __init__(self, chain_spec, queue_dir, dispatcher):
 | 
			
		||||
        self.dispatcher = dispatcher
 | 
			
		||||
        self.chain_spec = chain_spec,
 | 
			
		||||
        self.queue_dir = queue_dir
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process(self, rpc, limit=50):
 | 
			
		||||
        adapter = ChaindFsAdapter(
 | 
			
		||||
            self.chain_spec,
 | 
			
		||||
            self.queue_dir, 
 | 
			
		||||
            EthCacheTx,
 | 
			
		||||
            self.dispatcher,
 | 
			
		||||
            )
 | 
			
		||||
        
 | 
			
		||||
        upcoming = adapter.upcoming(limit=limit)
 | 
			
		||||
        logg.info('processor has {} candidates for {}, processing with limit {}'.format(len(upcoming), self.chain_spec, limit))
 | 
			
		||||
        i = 0
 | 
			
		||||
        for tx_hash in upcoming:
 | 
			
		||||
            if adapter.dispatch(tx_hash):
 | 
			
		||||
                i += 1
 | 
			
		||||
        return i
 | 
			
		||||
@ -22,7 +22,7 @@ logg = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
class SessionController:
 | 
			
		||||
 | 
			
		||||
    def __init__(self, config, adapter, processor):
 | 
			
		||||
    def __init__(self, config, processor):
 | 
			
		||||
        self.dead = False
 | 
			
		||||
        os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True)
 | 
			
		||||
        try:
 | 
			
		||||
@ -37,7 +37,6 @@ class SessionController:
 | 
			
		||||
        self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY')))
 | 
			
		||||
        self.processor = processor
 | 
			
		||||
        self.chain_spec = config.get('CHAIN_SPEC')
 | 
			
		||||
        self.adapter = adapter
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def shutdown(self, signo, frame):
 | 
			
		||||
@ -65,7 +64,7 @@ class SessionController:
 | 
			
		||||
        r = None
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                r = self.processor(self.chain_spec, self.adapter, conn)
 | 
			
		||||
                r = self.processor(conn)
 | 
			
		||||
                break
 | 
			
		||||
            except BackendError as e:
 | 
			
		||||
                state_lock.again(e)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user