From f385b26e1ee9a85ba59bb0da7da89c190793c46a Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 20 Apr 2022 13:17:38 +0000 Subject: [PATCH] Remove state module, move filterstate to filter module --- .gitignore | 1 - chainsyncer/filter.py | 106 +++++++++++++++++++++++++++++++++ chainsyncer/state/__init__.py | 1 - chainsyncer/state/base.py | 108 ---------------------------------- chainsyncer/store/base.py | 2 +- tests/test_basic.py | 2 +- 6 files changed, 108 insertions(+), 112 deletions(-) delete mode 100644 chainsyncer/state/__init__.py delete mode 100644 chainsyncer/state/base.py diff --git a/.gitignore b/.gitignore index 7ebd175..558f498 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,3 @@ gmon.out build/ dist/ *.sqlite -old/ diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 84d654f..72ad014 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -1,5 +1,12 @@ # standard imports import hashlib +import logging +import re +import os + +logg = logging.getLogger(__name__) + +re_processedname = r'^_?[A-Z,\.]*$' class SyncFilter: @@ -18,3 +25,102 @@ class SyncFilter: def common_name(self): s = self.__module__ + '.' + self.__class__.__name__ return s.replace('.', '_') + + +# TODO: properly clarify interface shared with syncfsstore, move to filter module? +class FilterState: + + def __init__(self, state_store, scan=None): + self.state_store = state_store + self.digest = b'\x00' * 32 + self.summed = False + self.__syncs = {} + self.synced = False + self.connected = False + self.state_store.add('DONE') + self.state_store.add('LOCK') + self.state_store.add('INTERRUPT') + self.state_store.add('RESET') + + self.state = self.state_store.state + self.put = self.state_store.put + self.set = self.state_store.set + self.next = self.state_store.next + self.move = self.state_store.move + self.unset = self.state_store.unset + self.peek = self.state_store.peek + self.from_name = self.state_store.from_name + self.state_store.sync() + self.all = self.state_store.all + self.started = False + + self.scan = scan + + + def __verify_sum(self, v): + if not isinstance(v, bytes) and not isinstance(v, bytearray): + raise ValueError('argument must be instance of bytes') + if len(v) != 32: + raise ValueError('argument must be 32 bytes') + + + def register(self, fltr): + if self.summed: + raise RuntimeError('filter already applied') + z = fltr.sum() + self.__verify_sum(z) + self.digest += z + s = fltr.common_name() + self.state_store.add(s) + n = self.state_store.from_name(s) + logg.debug('add filter {} {} {}'.format(s, n, self)) + + + def sum(self): + h = hashlib.sha256() + h.update(self.digest) + self.digest = h.digest() + self.summed = True + return self.digest + + + def connect(self): + if not self.synced: + for v in self.state_store.all(): + k = self.state_store.from_name(v) + self.state_store.sync(k) + self.__syncs[v] = True + if self.scan != None: + ks = self.scan() + for v in ks: #os.listdir(self.scan_path): + k = None + try: + k = self.state_store.from_elements(v) + self.state_store.alias(v, k) + except ValueError: + k = self.state_store.from_name(v) + self.state_store.sync(k) + self.__syncs[v] = True + self.synced = True + self.connected = True + + + def disconnect(self): + self.connected = False + + + def start(self, offset=0, target=-1): + self.state_store.start(offset=offset, target=target) + self.started = True + + + def get(self, k): + return None + + + def next_item(self): + return None + + + def filters(self): + return [] diff --git a/chainsyncer/state/__init__.py b/chainsyncer/state/__init__.py deleted file mode 100644 index 7f0e5e1..0000000 --- a/chainsyncer/state/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .base import FilterState diff --git a/chainsyncer/state/base.py b/chainsyncer/state/base.py deleted file mode 100644 index 46bee22..0000000 --- a/chainsyncer/state/base.py +++ /dev/null @@ -1,108 +0,0 @@ -# standard imports -import hashlib -import logging -import re -import os - -logg = logging.getLogger(__name__) - - -re_processedname = r'^_?[A-Z,\.]*$' - -# TODO: properly clarify interface shared with syncfsstore, move to filter module? -class FilterState: - - def __init__(self, state_store, scan=None): - self.state_store = state_store - self.digest = b'\x00' * 32 - self.summed = False - self.__syncs = {} - self.synced = False - self.connected = False - self.state_store.add('DONE') - self.state_store.add('LOCK') - self.state_store.add('INTERRUPT') - self.state_store.add('RESET') - - self.state = self.state_store.state - self.put = self.state_store.put - self.set = self.state_store.set - self.next = self.state_store.next - self.move = self.state_store.move - self.unset = self.state_store.unset - self.peek = self.state_store.peek - self.from_name = self.state_store.from_name - self.state_store.sync() - self.all = self.state_store.all - self.started = False - - self.scan = scan - - - def __verify_sum(self, v): - if not isinstance(v, bytes) and not isinstance(v, bytearray): - raise ValueError('argument must be instance of bytes') - if len(v) != 32: - raise ValueError('argument must be 32 bytes') - - - def register(self, fltr): - if self.summed: - raise RuntimeError('filter already applied') - z = fltr.sum() - self.__verify_sum(z) - self.digest += z - s = fltr.common_name() - self.state_store.add(s) - n = self.state_store.from_name(s) - logg.debug('add filter {} {} {}'.format(s, n, self)) - - - def sum(self): - h = hashlib.sha256() - h.update(self.digest) - self.digest = h.digest() - self.summed = True - return self.digest - - - def connect(self): - if not self.synced: - for v in self.state_store.all(): - k = self.state_store.from_name(v) - self.state_store.sync(k) - self.__syncs[v] = True - if self.scan != None: - ks = self.scan() - for v in ks: #os.listdir(self.scan_path): - k = None - try: - k = self.state_store.from_elements(v) - self.state_store.alias(v, k) - except ValueError: - k = self.state_store.from_name(v) - self.state_store.sync(k) - self.__syncs[v] = True - self.synced = True - self.connected = True - - - def disconnect(self): - self.connected = False - - - def start(self, offset=0, target=-1): - self.state_store.start(offset=offset, target=target) - self.started = True - - - def get(self, k): - return None - - - def next_item(self): - return None - - - def filters(self): - return [] diff --git a/chainsyncer/store/base.py b/chainsyncer/store/base.py index 20d7c2d..9fc36c6 100644 --- a/chainsyncer/store/base.py +++ b/chainsyncer/store/base.py @@ -4,7 +4,7 @@ import logging # local imports from shep.persist import PersistedState from shep.error import StateInvalid -from chainsyncer.state import FilterState +from chainsyncer.filter import FilterState from chainsyncer.error import ( LockError, FilterDone, diff --git a/tests/test_basic.py b/tests/test_basic.py index 6182316..922c09c 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -6,7 +6,7 @@ import logging # local imports from chainsyncer.session import SyncSession -from chainsyncer.state import FilterState +from chainsyncer.filter import FilterState from chainsyncer.store.fs import SyncFsStore from chainsyncer.unittest import ( MockStore,