From c509dd8f6c2a12ba5d451ec4933ad61a13b8f65b Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 18 Feb 2021 23:55:49 +0100 Subject: [PATCH] Fix backend db session leak, pass session to filter --- chainsyncer/backend.py | 4 +++- chainsyncer/driver.py | 2 +- chainsyncer/error.py | 4 ++++ chainsyncer/filter.py | 22 ++++++++++++++++------ 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index d7414fb..1926e27 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -32,7 +32,8 @@ class SyncerBackend: def connect(self): """Loads the state of the syncer session with the given id. """ - self.db_session = SessionBase.create_session() + if self.db_session == None: + self.db_session = SessionBase.create_session() q = self.db_session.query(BlockchainSync) q = q.filter(BlockchainSync.id==self.object_id) self.db_object = q.first() @@ -46,6 +47,7 @@ class SyncerBackend: self.db_session.add(self.db_object) self.db_session.commit() self.db_session.close() + self.db_session = None def chain(self): diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index 8ffb0d8..bba7e4c 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -28,7 +28,7 @@ class Syncer: self.cursor = None self.running = True self.backend = backend - self.filter = SyncFilter() + self.filter = SyncFilter(backend) self.progress_callback = progress_callback diff --git a/chainsyncer/error.py b/chainsyncer/error.py index fe1e00e..9d14d45 100644 --- a/chainsyncer/error.py +++ b/chainsyncer/error.py @@ -6,3 +6,7 @@ class LoopDone(Exception): class RequestError(Exception): pass + + +class BackendError(Exception): + pass diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 883bfe8..6b4fdbc 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -1,13 +1,17 @@ # standard imports import logging +# local imports +from .error import BackendError + logg = logging.getLogger(__name__) class SyncFilter: - def __init__(self, safe=True): + def __init__(self, backend, safe=True): self.safe = safe self.filters = [] + self.backend = backend def add(self, fltr): @@ -19,15 +23,21 @@ class SyncFilter: def apply(self, conn, block, tx): + session = None + try: + session = self.backend.connect() + except sqlalchemy.exc.TimeoutError as e: + self.backend.disconnect() + raise BackendError('database connection fail: {}'.format(e)) for f in self.filters: logg.debug('applying filter {}'.format(str(f))) - f.filter(conn, block, tx) + f.filter(conn, block, tx, self.backend.db_session) + self.backend.disconnect() - -class NoopFilter(SyncFilter): +class NoopFilter: - def filter(self, conn, block, tx): - logg.debug('noop filter :received\n{} {}'.format(block, tx)) + def filter(self, conn, block, tx, db_session=None): + logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session))) def __str__(self):