From 9386b9e7f9b043dd388964f02678fae5eed91972 Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 17 Mar 2022 14:54:34 +0000 Subject: [PATCH] Fs syncer --- chainsyncer/state/__init__.py | 1 + chainsyncer/store/fs.py | 146 ++++++++++++++++++++++++++++++++++ tests/test_basic.py | 15 ++++ tests/test_fs.py | 69 ++++++++++++++++ 4 files changed, 231 insertions(+) create mode 100644 chainsyncer/state/__init__.py create mode 100644 chainsyncer/store/fs.py create mode 100644 tests/test_fs.py diff --git a/chainsyncer/state/__init__.py b/chainsyncer/state/__init__.py new file mode 100644 index 0000000..b47c759 --- /dev/null +++ b/chainsyncer/state/__init__.py @@ -0,0 +1 @@ +from .base import SyncState diff --git a/chainsyncer/store/fs.py b/chainsyncer/store/fs.py new file mode 100644 index 0000000..a270ca3 --- /dev/null +++ b/chainsyncer/store/fs.py @@ -0,0 +1,146 @@ +# standard imports +import uuid +import os +import logging + +# external imports +from shep.store.file import SimpleFileStoreFactory +from shep.persist import PersistedState + +logg = logging.getLogger(__name__) + + +class SyncFsItem: + + def __init__(self, offset, target, state, started=False): #, offset, target, cursor): + self.offset = offset + self.target = target + self.state = state + s = str(offset) + match_state = self.state.NEW + if started: + match_state = self.state.SYNC + v = self.state.get(s) + self.cursor = int.from_bytes(v, 'big') + + + def __str__(self): + return 'syncitem offset {} target {}'.format(offset, target, cursor) + + + +class SyncFsStore: + + def __init__(self, base_path, session_id=None): + self.session_id = None + self.session_path = None + self.is_default = False + self.first = False + self.target = None + self.items = {} + + default_path = os.path.join(base_path, 'default') + + if session_id == None: + self.session_path = os.path.realpath(default_path) + self.is_default = True + else: + if session_id == 'default': + self.is_default = True + given_path = os.path.join(base_path, session_id) + self.session_path = os.path.realpath(given_path) + + create_path = False + try: + os.stat(self.session_path) + except FileNotFoundError: + create_path = True + + if create_path: + self.__create_path(base_path, default_path, session_id=session_id) + + logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path)) + + factory = SimpleFileStoreFactory(self.session_path, binary=True) + self.state = PersistedState(factory.add, 2) + self.state.add('SYNC') + self.state.add('DONE') + + + def __create_path(self, base_path, default_path, session_id=None): + logg.debug('fs store path {} does not exist, creating'.format(self.session_path)) + if session_id == None: + session_id = str(uuid.uuid4()) + self.session_path = os.path.join(base_path, session_id) + os.makedirs(self.session_path) + self.session_id = os.path.basename(self.session_path) + + if self.is_default: + try: + os.symlink(self.session_path, default_path) + except FileExistsError: + pass + + + def __load(self, target): + + self.state.sync(self.state.NEW) + self.state.sync(self.state.SYNC) + + thresholds = [] + for v in self.state.list(self.state.SYNC): + block_number = int(v) + thresholds.append(block_number) + #s = str(block_number) + #s = os.path.join(self.session_path, str(block_number)) + #self.range_paths.append(s) + logg.debug('queue resume {}'.format(block_number)) + for v in self.state.list(self.state.NEW): + block_number = int(v) + thresholds.append(block_number) + #s = str(block_number) + #s = os.path.join(self.session_path, str(block_number)) + #o = SyncItem(s, self.state) + #o = SyncFsItem(block_number, target, self.state) + #self.items[block_number] = o + #self.range_paths.append(s) + logg.debug('queue new range {}'.format(block_number)) + + thresholds.sort() + lim = len(thresholds) - 1 + for i in range(len(thresholds)): + item_target = target + if i < lim: + item_target = thresholds[i+1] + o = SyncFsItem(block_number, item_target, self.state, started=True) + self.items[block_number] = o + + fp = os.path.join(self.session_path, str(target)) + if len(thresholds) == 0: + logg.info('syncer first run') + self.first = True + f = open(fp, 'w') + f.write(str(target)) + f.close() + + f = open(fp, 'r') + v = f.read() + f.close() + self.target = int(v) + + + def start(self, offset=0, target=0): + self.__load(target) + + if self.first: + block_number = offset + block_number_bytes = block_number.to_bytes(4, 'big') + self.state.put(str(block_number), block_number_bytes) + elif offset > 0: + logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id)) + + def stop(self): + if self.target == 0: + block_number = self.height + 1 + block_number_bytes = block_number.to_bytes(4, 'big') + self.state.put(str(block_number), block_number_bytes) diff --git a/tests/test_basic.py b/tests/test_basic.py index ef2793d..2dc3e3c 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -1,6 +1,9 @@ # standard imports import unittest import hashlib +import tempfile +import shutil +import logging # external imports from shep.state import State @@ -8,7 +11,10 @@ from shep.state import State # local imports from chainsyncer.session import SyncSession from chainsyncer.state import SyncState +from chainsyncer.store.fs import SyncFsStore +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() class MockStore(State): @@ -42,6 +48,15 @@ class MockFilter: class TestSync(unittest.TestCase): + def setUp(self): + self.path = tempfile.mkdtemp() + self.store = SyncFsStore(self.path) + + + def tearDown(self): + shutil.rmtree(self.path) + + def test_basic(self): store = MockStore(6) state = SyncState(store) diff --git a/tests/test_fs.py b/tests/test_fs.py new file mode 100644 index 0000000..efc5972 --- /dev/null +++ b/tests/test_fs.py @@ -0,0 +1,69 @@ +# standard imports +import unittest +import tempfile +import shutil +import logging +import stat +import os + +# local imports +from chainsyncer.store.fs import SyncFsStore + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + + +class TestFs(unittest.TestCase): + + def setUp(self): + self.path = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.path) + + + def test_default(self): + store = SyncFsStore(self.path) + fp = os.path.join(self.path, store.session_id) + session_id = store.session_id + st = os.stat(fp) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + self.assertTrue(store.is_default) + + fpd = os.path.join(self.path, 'default') + st = os.stat(fpd) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + self.assertTrue(store.is_default) + + fpd = os.path.realpath(fpd) + self.assertEqual(fpd, fp) + + store = SyncFsStore(self.path) + fpr = os.path.join(self.path, session_id) + self.assertEqual(fp, fpr) + self.assertTrue(store.is_default) + + store = SyncFsStore(self.path, 'default') + fpr = os.path.join(self.path, session_id) + self.assertEqual(fp, fpr) + self.assertTrue(store.is_default) + + store = SyncFsStore(self.path, 'foo') + fpf = os.path.join(self.path, 'foo') + st = os.stat(fpf) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + self.assertFalse(store.is_default) + + + def test_store_start(self): + store = SyncFsStore(self.path) + store.start(42) + self.assertTrue(store.first) + + store = SyncFsStore(self.path) + store.start() + self.assertFalse(store.first) + + +if __name__ == '__main__': + unittest.main()