@@ -28,11 +28,12 @@ from chainqueue.cache import CacheTokenTx
from chainlib . encode import TxHexNormalizer
from chainlib . chain import ChainSpec
from chaind . adapters . fs import ChaindFsAdapter
from chaind . dispatch import DispatchProcessor
# local imports
from chaind . eth . dispatch import EthDispatcher
from chaind . eth . cache import EthCacheTx
from chaind . eth . settings import ChaindEthSettings
from chaind . eth . dispatch import EthDispatcher
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
@@ -66,41 +67,16 @@ settings.process(config)
logg . debug ( ' settings: \n {} ' . format ( settings ) )
def process_outgoing ( chain_spec , adapter , rpc , limit = 50 ) :
adapter = None
process_err = None
adapter = ChaindFsAdapter (
settings . get ( ' CHAIN_SPEC ' ) ,
settings . dir_for ( ' queue ' ) ,
EthCacheTx ,
dispatcher ,
)
upcoming = adapter . upcoming ( limit = limit )
logg . info ( ' processor has {} candidates for {} , processing with limit {} adapter {} rpc {} ' . format ( len ( upcoming ) , chain_spec , limit , adapter , rpc ) )
i = 0
for tx_hash in upcoming :
if adapter . dispatch ( tx_hash ) :
i + = 1
return i
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CHAIN_SPEC ' ) )
rpc = chainlib . eth . cli . Rpc ( )
conn = rpc . connect_by_config ( config )
tx_normalizer = TxHexNormalizer ( ) . tx_hash
token_cache_store = CacheTokenTx ( chain_spec , normalizer = tx_normalizer )
dispatcher = EthDispatcher ( conn )
queue_adapter = ChaindFsAdapter (
settings . get ( ' CHAIN_SPEC ' ) ,
settings . dir_for ( ' queue ' ) ,
EthCacheTx ,
dispatcher ,
)
token_cache_store = CacheTokenTx ( settings . get ( ' CHAIN_SPEC ' ) , normalizer = tx_normalizer )
dispatcher = EthDispatcher ( conn )
processor = DispatchProcessor ( settings . get ( ' CHAIN_SPEC ' ) , settings . dir_for ( ' queue ' ) , dispatcher )
ctrl = SessionController ( settings , processor . process )
ctrl = SessionController ( settings , queue_adapter , process_outgoing )
signal . signal ( signal . SIGINT , ctrl . shutdown )
signal . signal ( signal . SIGTERM , ctrl . shutdown )
@@ -126,6 +102,14 @@ def main():
ctrl . process ( conn )
continue
queue_adapter = ChaindFsAdapter (
settings . get ( ' CHAIN_SPEC ' ) ,
settings . dir_for ( ' queue ' ) ,
EthCacheTx ,
dispatcher ,
store_sync = False ,
)
result_data = None
r = 0 # no error
try :