From 99af484f30c14cd75bd43707bd9d125dbd354800 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 2 Jun 2021 14:07:01 +0200 Subject: [PATCH] Add upcoming getter --- chainqueue/adapters/base.py | 13 ++++++++++++ chainqueue/adapters/eth.py | 40 +++++++++++++++++++++++++++++------ chainqueue/runnable/server.py | 8 ++++--- chainqueue/sql/backend.py | 19 ++++++++++++++++- chainqueue/sql/query.py | 7 ++++-- 5 files changed, 75 insertions(+), 12 deletions(-) create mode 100644 chainqueue/adapters/base.py diff --git a/chainqueue/adapters/base.py b/chainqueue/adapters/base.py new file mode 100644 index 0000000..c0d7b45 --- /dev/null +++ b/chainqueue/adapters/base.py @@ -0,0 +1,13 @@ + +class Adapter: + + def __init__(self, backend): + self.backend = backend + + + def process(self, chain_spec): + raise NotImplementedEror() + + + def add(self, chain_spec, bytecode): + raise NotImplementedEror() diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py index 716ea78..a65a1fa 100644 --- a/chainqueue/adapters/eth.py +++ b/chainqueue/adapters/eth.py @@ -1,21 +1,49 @@ # external imports +from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.tx import ( unpack, ) -from hexathon import add_0x +from hexathon import ( + add_0x, + strip_0x, + ) # local imports +from chainqueue.adapters.base import Adapter +from chainqueue.enum import StatusBits -class EthAdapter: +class EthAdapter(Adapter): - def __init__(self, backend): - self.backend = backend + + def translate(self, chain_spec, bytecode): + tx = unpack(bytecode, chain_spec) + tx['source_token'] = ZERO_ADDRESS + tx['destination_token'] = ZERO_ADDRESS + tx['from_value'] = tx['value'] + tx['to_value'] = tx['value'] + return tx def add(self, chain_spec, bytecode): - tx = unpack(bytecode, chain_spec) + tx = self.translate(chain_spec, bytecode) + session = self.backend.create_session() + r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) + if r: + session.rollback() + session.close() + return r + r = self.backend.cache(tx, session=session) + session.commit() + session.close() + return r + + + def cache(self, chain_spec): session = self.backend.create_session() r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) session.close() - return r + + + def process(self, chain_spec): + return self.backend.get(chain_spec, StatusBits.QUEUED, unpack) diff --git a/chainqueue/runnable/server.py b/chainqueue/runnable/server.py index 23f2605..044b808 100644 --- a/chainqueue/runnable/server.py +++ b/chainqueue/runnable/server.py @@ -71,7 +71,7 @@ class SessionController: self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) self.srv.bind(config.get('SESSION_SOCKET_PATH')) self.srv.listen(2) - self.srv.settimeout(5.0) + self.srv.settimeout(1.0) def shutdown(self, signo, frame): if self.dead: @@ -99,7 +99,7 @@ signal.signal(signal.SIGINT, ctrl.shutdown) signal.signal(signal.SIGTERM, ctrl.shutdown) dsn = dsn_from_config(config) -backend = SQLBackend(dsn) +backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG')) adapter = EthAdapter(backend) chain_spec = ChainSpec.from_chain_str('evm:mainnet:1') @@ -118,7 +118,9 @@ if __name__ == '__main__': logg.error('entity on socket path is not a socket') break if srvs == None: - logg.debug('ping') + txs = adapter.process(chain_spec) + for k in txs.keys(): + logg.debug('txs {} {}'.format(k, txs[k])) continue srvs.setblocking(False) data_in = srvs.recv(1024) diff --git a/chainqueue/sql/backend.py b/chainqueue/sql/backend.py index 80d6864..4f12d40 100644 --- a/chainqueue/sql/backend.py +++ b/chainqueue/sql/backend.py @@ -9,6 +9,9 @@ from sqlalchemy.exc import ( # local imports from chainqueue.sql.tx import create as queue_create from chainqueue.db.models.base import SessionBase +from chainqueue.db.models.tx import TxCache +from chainqueue.sql.query import get_upcoming_tx +from chainqueue.sql.state import set_ready logg = logging.getLogger(__name__) @@ -21,12 +24,26 @@ class SQLBackend: def create(self, chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None): try: - queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None) + queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=session) except IntegrityError as e: logg.warning('skipped possible duplicate insert {}'.format(e)) return 1 + set_ready(chain_spec, tx_hash, session=session) return 0 + def cache(self, tx, session=None): + txc = TxCache(tx['hash'], tx['from'], tx['to'], tx['source_token'], tx['destination_token'], tx['from_value'], tx['to_value'], session=session) + session.add(txc) + session.flush() + logg.debug('cache {}'.format(txc)) + return 0 + + + def get(self, chain_spec, typ, decoder): + txs = get_upcoming_tx(chain_spec, typ, decoder=decoder) + return txs + + def create_session(self): return SessionBase.create_session() diff --git a/chainqueue/sql/query.py b/chainqueue/sql/query.py index f65e5c2..fe2df21 100644 --- a/chainqueue/sql/query.py +++ b/chainqueue/sql/query.py @@ -8,7 +8,10 @@ from sqlalchemy import or_ from sqlalchemy import not_ from sqlalchemy import tuple_ from sqlalchemy import func -from hexathon import add_0x +from hexathon import ( + add_0x, + strip_0x, + ) # local imports from chainqueue.db.models.otx import Otx @@ -274,7 +277,7 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re if o == None: continue - tx_signed_bytes = bytes.fromhex(o.signed_tx[2:]) + tx_signed_bytes = bytes.fromhex(strip_0x(o.signed_tx)) tx = decoder(tx_signed_bytes, chain_spec) txs[o.tx_hash] = o.signed_tx