Factor out target state
This commit is contained in:
parent
97c2d41df3
commit
9aded5c561
@ -154,6 +154,7 @@ class SyncStore:
|
|||||||
self.items = {}
|
self.items = {}
|
||||||
self.item_keys = []
|
self.item_keys = []
|
||||||
self.started = False
|
self.started = False
|
||||||
|
self.thresholds = []
|
||||||
|
|
||||||
|
|
||||||
def setup_sync_state(self, factory, event_callback):
|
def setup_sync_state(self, factory, event_callback):
|
||||||
@ -168,6 +169,14 @@ class SyncStore:
|
|||||||
self.filters = []
|
self.filters = []
|
||||||
|
|
||||||
|
|
||||||
|
def set_target(self, v):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def get_target(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def register(self, fltr):
|
def register(self, fltr):
|
||||||
self.filters.append(fltr)
|
self.filters.append(fltr)
|
||||||
self.filter_state.register(fltr)
|
self.filter_state.register(fltr)
|
||||||
@ -211,6 +220,46 @@ class SyncStore:
|
|||||||
logg.debug('item {}'.format(self.state.state(item.state_key)))
|
logg.debug('item {}'.format(self.state.state(item.state_key)))
|
||||||
|
|
||||||
|
|
||||||
|
def load(self, target):
|
||||||
|
self.state.sync(self.state.NEW)
|
||||||
|
self.state.sync(self.state.SYNC)
|
||||||
|
|
||||||
|
thresholds_sync = []
|
||||||
|
for v in self.state.list(self.state.SYNC):
|
||||||
|
block_number = int(v)
|
||||||
|
thresholds_sync.append(block_number)
|
||||||
|
logg.debug('queue resume {}'.format(block_number))
|
||||||
|
thresholds_new = []
|
||||||
|
for v in self.state.list(self.state.NEW):
|
||||||
|
block_number = int(v)
|
||||||
|
thresholds_new.append(block_number)
|
||||||
|
logg.debug('queue new range {}'.format(block_number))
|
||||||
|
|
||||||
|
thresholds_sync.sort()
|
||||||
|
thresholds_new.sort()
|
||||||
|
thresholds = thresholds_sync + thresholds_new
|
||||||
|
lim = len(thresholds) - 1
|
||||||
|
|
||||||
|
for i in range(len(thresholds)):
|
||||||
|
item_target = target
|
||||||
|
if i < lim:
|
||||||
|
item_target = thresholds[i+1]
|
||||||
|
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True)
|
||||||
|
self.items[block_number] = o
|
||||||
|
self.item_keys.append(block_number)
|
||||||
|
logg.info('added existing {}'.format(o))
|
||||||
|
|
||||||
|
self.get_target()
|
||||||
|
|
||||||
|
if len(thresholds) == 0:
|
||||||
|
if self.target != None:
|
||||||
|
logg.warning('sync "{}" is already done, nothing to do'.format(self.session_id))
|
||||||
|
else:
|
||||||
|
logg.info('syncer first run target {}'.format(target))
|
||||||
|
self.first = True
|
||||||
|
self.set_target(target)
|
||||||
|
|
||||||
|
|
||||||
def get(self, k):
|
def get(self, k):
|
||||||
return self.items[k]
|
return self.items[k]
|
||||||
|
|
||||||
|
@ -66,54 +66,21 @@ class SyncFsStore(SyncStore):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def load(self, target):
|
def get_target(self):
|
||||||
self.state.sync(self.state.NEW)
|
|
||||||
self.state.sync(self.state.SYNC)
|
|
||||||
|
|
||||||
thresholds_sync = []
|
|
||||||
for v in self.state.list(self.state.SYNC):
|
|
||||||
block_number = int(v)
|
|
||||||
thresholds_sync.append(block_number)
|
|
||||||
logg.debug('queue resume {}'.format(block_number))
|
|
||||||
thresholds_new = []
|
|
||||||
for v in self.state.list(self.state.NEW):
|
|
||||||
block_number = int(v)
|
|
||||||
thresholds_new.append(block_number)
|
|
||||||
logg.debug('queue new range {}'.format(block_number))
|
|
||||||
|
|
||||||
thresholds_sync.sort()
|
|
||||||
thresholds_new.sort()
|
|
||||||
thresholds = thresholds_sync + thresholds_new
|
|
||||||
lim = len(thresholds) - 1
|
|
||||||
|
|
||||||
logg.debug('thresholds {}'.format(thresholds))
|
|
||||||
for i in range(len(thresholds)):
|
|
||||||
item_target = target
|
|
||||||
if i < lim:
|
|
||||||
item_target = thresholds[i+1]
|
|
||||||
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True)
|
|
||||||
self.items[block_number] = o
|
|
||||||
self.item_keys.append(block_number)
|
|
||||||
logg.info('added existing {}'.format(o))
|
|
||||||
|
|
||||||
fp = os.path.join(self.session_path, 'target')
|
fp = os.path.join(self.session_path, 'target')
|
||||||
have_target = False
|
|
||||||
try:
|
try:
|
||||||
f = open(fp, 'r')
|
f = open(fp, 'r')
|
||||||
v = f.read()
|
v = f.read()
|
||||||
f.close()
|
f.close()
|
||||||
self.target = int(v)
|
self.target = int(v)
|
||||||
have_target = True
|
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as e:
|
||||||
|
logg.debug('cant find target {} {}'.format(fp, e))
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if len(thresholds) == 0:
|
|
||||||
if have_target:
|
def set_target(self, v):
|
||||||
logg.warning('sync "{}" is already done, nothing to do'.format(self.session_id))
|
fp = os.path.join(self.session_path, 'target')
|
||||||
else:
|
|
||||||
logg.info('syncer first run target {}'.format(target))
|
|
||||||
self.first = True
|
|
||||||
f = open(fp, 'w')
|
f = open(fp, 'w')
|
||||||
f.write(str(target))
|
f.write(str(v))
|
||||||
f.close()
|
f.close()
|
||||||
self.target = target
|
self.target = v
|
||||||
|
Loading…
Reference in New Issue
Block a user