diff --git a/chainsyncer/session.py b/chainsyncer/session.py index e23d8c0..aa2c08e 100644 --- a/chainsyncer/session.py +++ b/chainsyncer/session.py @@ -4,14 +4,8 @@ import uuid class SyncSession: - def __init__(self, session_store, sync_state, session_id=None, is_default=False): + def __init__(self, session_store): self.session_store = session_store - if session_id == None: - session_id = str(uuid.uuid4()) - is_default = True - self.session_id = session_id - self.is_default = is_default - self.sync_state = sync_state self.filters = [] self.started = False @@ -19,7 +13,7 @@ class SyncSession: def add_filter(self, fltr): if self.started: raise RuntimeError('filters cannot be changed after syncer start') - self.sync_state.register(fltr) + self.session_store.register(fltr) self.filters.append(fltr) diff --git a/chainsyncer/store/fs.py b/chainsyncer/store/fs.py index a270ca3..3083173 100644 --- a/chainsyncer/store/fs.py +++ b/chainsyncer/store/fs.py @@ -7,25 +7,33 @@ import logging from shep.store.file import SimpleFileStoreFactory from shep.persist import PersistedState +# local imports +from chainsyncer.state import SyncState + logg = logging.getLogger(__name__) class SyncFsItem: - def __init__(self, offset, target, state, started=False): #, offset, target, cursor): + def __init__(self, offset, target, sync_state, filter_state, started=False): self.offset = offset self.target = target - self.state = state + self.sync_state = sync_state + self.filter_state = filter_state s = str(offset) - match_state = self.state.NEW + match_state = self.sync_state.NEW if started: - match_state = self.state.SYNC - v = self.state.get(s) + match_state = self.sync_state.SYNC + v = self.sync_state.get(s) self.cursor = int.from_bytes(v, 'big') + def next(self): + pass + + def __str__(self): - return 'syncitem offset {} target {}'.format(offset, target, cursor) + return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor) @@ -66,6 +74,12 @@ class SyncFsStore: self.state.add('SYNC') self.state.add('DONE') + base_filter_path = os.path.join(self.session_path, 'filter') + factory = SimpleFileStoreFactory(base_filter_path, binary=True) + filter_state_backend = PersistedState(factory, 0) + self.filter_state = SyncState(filter_state_backend) + self.register = self.filter_state.register + def __create_path(self, base_path, default_path, session_id=None): logg.debug('fs store path {} does not exist, creating'.format(self.session_path)) @@ -83,37 +97,32 @@ class SyncFsStore: def __load(self, target): - + self.state.sync(self.state.NEW) self.state.sync(self.state.SYNC) - thresholds = [] + thresholds_sync = [] for v in self.state.list(self.state.SYNC): block_number = int(v) - thresholds.append(block_number) - #s = str(block_number) - #s = os.path.join(self.session_path, str(block_number)) - #self.range_paths.append(s) + thresholds_sync.append(block_number) logg.debug('queue resume {}'.format(block_number)) + thresholds_new = [] for v in self.state.list(self.state.NEW): block_number = int(v) - thresholds.append(block_number) - #s = str(block_number) - #s = os.path.join(self.session_path, str(block_number)) - #o = SyncItem(s, self.state) - #o = SyncFsItem(block_number, target, self.state) - #self.items[block_number] = o - #self.range_paths.append(s) + thresholds_new.append(block_number) logg.debug('queue new range {}'.format(block_number)) - thresholds.sort() + thresholds_sync.sort() + thresholds_new.sort() + thresholds = thresholds_sync + thresholds_new lim = len(thresholds) - 1 for i in range(len(thresholds)): item_target = target if i < lim: item_target = thresholds[i+1] - o = SyncFsItem(block_number, item_target, self.state, started=True) + o = SyncFsItem(block_number, item_target, self.state, self.filter_state, started=True) self.items[block_number] = o + logg.info('added {}'.format(o)) fp = os.path.join(self.session_path, str(target)) if len(thresholds) == 0: diff --git a/tests/test_basic.py b/tests/test_basic.py index 2dc3e3c..ac65c25 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -60,11 +60,7 @@ class TestSync(unittest.TestCase): def test_basic(self): store = MockStore(6) state = SyncState(store) - session = SyncSession(None, state) - self.assertTrue(session.is_default) - - session = SyncSession(None, state, session_id='foo') - self.assertFalse(session.is_default) + session = SyncSession(state) def test_sum(self): @@ -91,7 +87,7 @@ class TestSync(unittest.TestCase): def test_session_start(self): store = MockStore(6) state = SyncState(store) - session = SyncSession(None, state) + session = SyncSession(state) session.start() diff --git a/tests/test_fs.py b/tests/test_fs.py index efc5972..f41d8d7 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -65,5 +65,12 @@ class TestFs(unittest.TestCase): self.assertFalse(store.first) + def test_store_resume(self): + store = SyncFsStore(self.path) + store.start(13) + self.assertTrue(store.first) + + + if __name__ == '__main__': unittest.main()