Remove state module, move filterstate to filter module

This commit is contained in:
lash 2022-04-20 13:17:38 +00:00
parent b7957b8a0b
commit f385b26e1e
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
6 changed files with 108 additions and 112 deletions

1
.gitignore vendored
View File

@ -6,4 +6,3 @@ gmon.out
build/ build/
dist/ dist/
*.sqlite *.sqlite
old/

View File

@ -1,5 +1,12 @@
# standard imports # standard imports
import hashlib import hashlib
import logging
import re
import os
logg = logging.getLogger(__name__)
re_processedname = r'^_?[A-Z,\.]*$'
class SyncFilter: class SyncFilter:
@ -18,3 +25,102 @@ class SyncFilter:
def common_name(self): def common_name(self):
s = self.__module__ + '.' + self.__class__.__name__ s = self.__module__ + '.' + self.__class__.__name__
return s.replace('.', '_') return s.replace('.', '_')
# TODO: properly clarify interface shared with syncfsstore, move to filter module?
class FilterState:
def __init__(self, state_store, scan=None):
self.state_store = state_store
self.digest = b'\x00' * 32
self.summed = False
self.__syncs = {}
self.synced = False
self.connected = False
self.state_store.add('DONE')
self.state_store.add('LOCK')
self.state_store.add('INTERRUPT')
self.state_store.add('RESET')
self.state = self.state_store.state
self.put = self.state_store.put
self.set = self.state_store.set
self.next = self.state_store.next
self.move = self.state_store.move
self.unset = self.state_store.unset
self.peek = self.state_store.peek
self.from_name = self.state_store.from_name
self.state_store.sync()
self.all = self.state_store.all
self.started = False
self.scan = scan
def __verify_sum(self, v):
if not isinstance(v, bytes) and not isinstance(v, bytearray):
raise ValueError('argument must be instance of bytes')
if len(v) != 32:
raise ValueError('argument must be 32 bytes')
def register(self, fltr):
if self.summed:
raise RuntimeError('filter already applied')
z = fltr.sum()
self.__verify_sum(z)
self.digest += z
s = fltr.common_name()
self.state_store.add(s)
n = self.state_store.from_name(s)
logg.debug('add filter {} {} {}'.format(s, n, self))
def sum(self):
h = hashlib.sha256()
h.update(self.digest)
self.digest = h.digest()
self.summed = True
return self.digest
def connect(self):
if not self.synced:
for v in self.state_store.all():
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
if self.scan != None:
ks = self.scan()
for v in ks: #os.listdir(self.scan_path):
k = None
try:
k = self.state_store.from_elements(v)
self.state_store.alias(v, k)
except ValueError:
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
self.synced = True
self.connected = True
def disconnect(self):
self.connected = False
def start(self, offset=0, target=-1):
self.state_store.start(offset=offset, target=target)
self.started = True
def get(self, k):
return None
def next_item(self):
return None
def filters(self):
return []

View File

@ -1 +0,0 @@
from .base import FilterState

View File

@ -1,108 +0,0 @@
# standard imports
import hashlib
import logging
import re
import os
logg = logging.getLogger(__name__)
re_processedname = r'^_?[A-Z,\.]*$'
# TODO: properly clarify interface shared with syncfsstore, move to filter module?
class FilterState:
def __init__(self, state_store, scan=None):
self.state_store = state_store
self.digest = b'\x00' * 32
self.summed = False
self.__syncs = {}
self.synced = False
self.connected = False
self.state_store.add('DONE')
self.state_store.add('LOCK')
self.state_store.add('INTERRUPT')
self.state_store.add('RESET')
self.state = self.state_store.state
self.put = self.state_store.put
self.set = self.state_store.set
self.next = self.state_store.next
self.move = self.state_store.move
self.unset = self.state_store.unset
self.peek = self.state_store.peek
self.from_name = self.state_store.from_name
self.state_store.sync()
self.all = self.state_store.all
self.started = False
self.scan = scan
def __verify_sum(self, v):
if not isinstance(v, bytes) and not isinstance(v, bytearray):
raise ValueError('argument must be instance of bytes')
if len(v) != 32:
raise ValueError('argument must be 32 bytes')
def register(self, fltr):
if self.summed:
raise RuntimeError('filter already applied')
z = fltr.sum()
self.__verify_sum(z)
self.digest += z
s = fltr.common_name()
self.state_store.add(s)
n = self.state_store.from_name(s)
logg.debug('add filter {} {} {}'.format(s, n, self))
def sum(self):
h = hashlib.sha256()
h.update(self.digest)
self.digest = h.digest()
self.summed = True
return self.digest
def connect(self):
if not self.synced:
for v in self.state_store.all():
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
if self.scan != None:
ks = self.scan()
for v in ks: #os.listdir(self.scan_path):
k = None
try:
k = self.state_store.from_elements(v)
self.state_store.alias(v, k)
except ValueError:
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
self.synced = True
self.connected = True
def disconnect(self):
self.connected = False
def start(self, offset=0, target=-1):
self.state_store.start(offset=offset, target=target)
self.started = True
def get(self, k):
return None
def next_item(self):
return None
def filters(self):
return []

View File

@ -4,7 +4,7 @@ import logging
# local imports # local imports
from shep.persist import PersistedState from shep.persist import PersistedState
from shep.error import StateInvalid from shep.error import StateInvalid
from chainsyncer.state import FilterState from chainsyncer.filter import FilterState
from chainsyncer.error import ( from chainsyncer.error import (
LockError, LockError,
FilterDone, FilterDone,

View File

@ -6,7 +6,7 @@ import logging
# local imports # local imports
from chainsyncer.session import SyncSession from chainsyncer.session import SyncSession
from chainsyncer.state import FilterState from chainsyncer.filter import FilterState
from chainsyncer.store.fs import SyncFsStore from chainsyncer.store.fs import SyncFsStore
from chainsyncer.unittest import ( from chainsyncer.unittest import (
MockStore, MockStore,