Fix backend db session leak, pass session to filter
This commit is contained in:
parent
7718c8e3d4
commit
c509dd8f6c
@ -32,7 +32,8 @@ class SyncerBackend:
|
|||||||
def connect(self):
|
def connect(self):
|
||||||
"""Loads the state of the syncer session with the given id.
|
"""Loads the state of the syncer session with the given id.
|
||||||
"""
|
"""
|
||||||
self.db_session = SessionBase.create_session()
|
if self.db_session == None:
|
||||||
|
self.db_session = SessionBase.create_session()
|
||||||
q = self.db_session.query(BlockchainSync)
|
q = self.db_session.query(BlockchainSync)
|
||||||
q = q.filter(BlockchainSync.id==self.object_id)
|
q = q.filter(BlockchainSync.id==self.object_id)
|
||||||
self.db_object = q.first()
|
self.db_object = q.first()
|
||||||
@ -46,6 +47,7 @@ class SyncerBackend:
|
|||||||
self.db_session.add(self.db_object)
|
self.db_session.add(self.db_object)
|
||||||
self.db_session.commit()
|
self.db_session.commit()
|
||||||
self.db_session.close()
|
self.db_session.close()
|
||||||
|
self.db_session = None
|
||||||
|
|
||||||
|
|
||||||
def chain(self):
|
def chain(self):
|
||||||
|
@ -28,7 +28,7 @@ class Syncer:
|
|||||||
self.cursor = None
|
self.cursor = None
|
||||||
self.running = True
|
self.running = True
|
||||||
self.backend = backend
|
self.backend = backend
|
||||||
self.filter = SyncFilter()
|
self.filter = SyncFilter(backend)
|
||||||
self.progress_callback = progress_callback
|
self.progress_callback = progress_callback
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,3 +6,7 @@ class LoopDone(Exception):
|
|||||||
|
|
||||||
class RequestError(Exception):
|
class RequestError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class BackendError(Exception):
|
||||||
|
pass
|
||||||
|
@ -1,13 +1,17 @@
|
|||||||
# standard imports
|
# standard imports
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from .error import BackendError
|
||||||
|
|
||||||
logg = logging.getLogger(__name__)
|
logg = logging.getLogger(__name__)
|
||||||
|
|
||||||
class SyncFilter:
|
class SyncFilter:
|
||||||
|
|
||||||
def __init__(self, safe=True):
|
def __init__(self, backend, safe=True):
|
||||||
self.safe = safe
|
self.safe = safe
|
||||||
self.filters = []
|
self.filters = []
|
||||||
|
self.backend = backend
|
||||||
|
|
||||||
|
|
||||||
def add(self, fltr):
|
def add(self, fltr):
|
||||||
@ -19,15 +23,21 @@ class SyncFilter:
|
|||||||
|
|
||||||
|
|
||||||
def apply(self, conn, block, tx):
|
def apply(self, conn, block, tx):
|
||||||
|
session = None
|
||||||
|
try:
|
||||||
|
session = self.backend.connect()
|
||||||
|
except sqlalchemy.exc.TimeoutError as e:
|
||||||
|
self.backend.disconnect()
|
||||||
|
raise BackendError('database connection fail: {}'.format(e))
|
||||||
for f in self.filters:
|
for f in self.filters:
|
||||||
logg.debug('applying filter {}'.format(str(f)))
|
logg.debug('applying filter {}'.format(str(f)))
|
||||||
f.filter(conn, block, tx)
|
f.filter(conn, block, tx, self.backend.db_session)
|
||||||
|
self.backend.disconnect()
|
||||||
|
|
||||||
|
class NoopFilter:
|
||||||
|
|
||||||
class NoopFilter(SyncFilter):
|
def filter(self, conn, block, tx, db_session=None):
|
||||||
|
logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session)))
|
||||||
def filter(self, conn, block, tx):
|
|
||||||
logg.debug('noop filter :received\n{} {}'.format(block, tx))
|
|
||||||
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user