From b0eb0c474d39fd35288b41850ee33461a34fe914 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 2 Jun 2021 13:09:21 +0200 Subject: [PATCH] WIP server sql backend adapter handle duplicates --- chainqueue/adapters/eth.py | 3 ++- chainqueue/error.py | 1 - chainqueue/runnable/server.py | 3 ++- chainqueue/sql/backend.py | 22 ++++++++++++++++++++-- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py index 730e678..716ea78 100644 --- a/chainqueue/adapters/eth.py +++ b/chainqueue/adapters/eth.py @@ -16,5 +16,6 @@ class EthAdapter: def add(self, chain_spec, bytecode): tx = unpack(bytecode, chain_spec) session = self.backend.create_session() - self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) + r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) session.close() + return r diff --git a/chainqueue/error.py b/chainqueue/error.py index 39db5db..c333d74 100644 --- a/chainqueue/error.py +++ b/chainqueue/error.py @@ -27,4 +27,3 @@ class BackendIntegrityError(ChainQueueException): """ pass - diff --git a/chainqueue/runnable/server.py b/chainqueue/runnable/server.py index 9b76196..23f2605 100644 --- a/chainqueue/runnable/server.py +++ b/chainqueue/runnable/server.py @@ -124,6 +124,7 @@ if __name__ == '__main__': data_in = srvs.recv(1024) data_in_str = data_in.decode('utf-8') data = bytes.fromhex(strip_0x(data_in_str)) - adapter.add(chain_spec, data) + r = adapter.add(chain_spec, data) + srvs.send(r.to_bytes(4, byteorder='big')) ctrl.shutdown(None, None) diff --git a/chainqueue/sql/backend.py b/chainqueue/sql/backend.py index 4f6419f..80d6864 100644 --- a/chainqueue/sql/backend.py +++ b/chainqueue/sql/backend.py @@ -1,13 +1,31 @@ +# standard imports +import logging + +# external imports +from sqlalchemy.exc import ( + IntegrityError, + ) + # local imports -from chainqueue.sql.tx import create +from chainqueue.sql.tx import create as queue_create from chainqueue.db.models.base import SessionBase +logg = logging.getLogger(__name__) + class SQLBackend: def __init__(self, conn_spec, *args, **kwargs): SessionBase.connect(conn_spec, pool_size=kwargs.get('poolsize', 0), debug=kwargs.get('debug', False)) - self.create = create + + + def create(self, chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None): + try: + queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None) + except IntegrityError as e: + logg.warning('skipped possible duplicate insert {}'.format(e)) + return 1 + return 0 def create_session(self):