2022-04-20 13:59:12 +02:00
# standard imports
2022-04-20 16:27:59 +02:00
import os
2022-04-20 13:59:12 +02:00
import logging
# local imports
2022-04-20 14:31:32 +02:00
from shep . persist import PersistedState
2022-04-26 09:56:04 +02:00
from shep import State
2022-04-20 13:59:12 +02:00
from shep . error import StateInvalid
2022-04-20 15:17:38 +02:00
from chainsyncer . filter import FilterState
2022-04-20 13:59:12 +02:00
from chainsyncer . error import (
LockError ,
FilterDone ,
InterruptError ,
IncompleteFilterError ,
SyncDone ,
2022-04-30 09:35:08 +02:00
FilterInitializationError ,
2022-04-20 13:59:12 +02:00
)
logg = logging . getLogger ( __name__ )
def sync_state_serialize ( block_height , tx_index , block_target ) :
b = block_height . to_bytes ( 4 , ' big ' )
b + = tx_index . to_bytes ( 4 , ' big ' )
b + = block_target . to_bytes ( 4 , ' big ' , signed = True )
return b
def sync_state_deserialize ( b ) :
block_height = int . from_bytes ( b [ : 4 ] , ' big ' )
tx_index = int . from_bytes ( b [ 4 : 8 ] , ' big ' )
block_target = int . from_bytes ( b [ 8 : ] , ' big ' , signed = True )
return ( block_height , tx_index , block_target , )
# NOT thread safe
class SyncItem :
2022-04-28 10:15:04 +02:00
def __init__ ( self , offset , target , sync_state , filter_state , started = False , ignore_lock = False ) :
2022-04-20 13:59:12 +02:00
self . offset = offset
self . target = target
self . sync_state = sync_state
self . filter_state = filter_state
self . state_key = str ( offset )
v = self . sync_state . get ( self . state_key )
( self . cursor , self . tx_cursor , self . target ) = sync_state_deserialize ( v )
2022-04-28 14:35:18 +02:00
filter_state = self . filter_state . state ( self . state_key )
if filter_state & self . filter_state . from_name ( ' LOCK ' ) > 0 and not ignore_lock :
2022-04-20 13:59:12 +02:00
raise LockError ( self . state_key )
self . count = len ( self . filter_state . all ( pure = True ) ) - 4
self . skip_filter = False
if self . count == 0 :
self . skip_filter = True
elif not started :
self . filter_state . move ( self . state_key , self . filter_state . from_name ( ' RESET ' ) )
2022-04-28 14:35:18 +02:00
2022-04-20 13:59:12 +02:00
def __check_done ( self ) :
if self . filter_state . state ( self . state_key ) & self . filter_state . from_name ( ' INTERRUPT ' ) > 0 :
raise InterruptError ( self . state_key )
if self . filter_state . state ( self . state_key ) & self . filter_state . from_name ( ' DONE ' ) > 0 :
raise FilterDone ( self . state_key )
2022-04-28 10:15:04 +02:00
def reset ( self , check_incomplete = True ) :
if check_incomplete :
if self . filter_state . state ( self . state_key ) & self . filter_state . from_name ( ' LOCK ' ) > 0 :
raise LockError ( ' reset attempt on {} when state locked ' . format ( self . state_key ) )
if self . filter_state . state ( self . state_key ) & self . filter_state . from_name ( ' DONE ' ) == 0 :
raise IncompleteFilterError ( ' reset attempt on {} when incomplete ' . format ( self . state_key ) )
2022-04-20 13:59:12 +02:00
self . filter_state . move ( self . state_key , self . filter_state . from_name ( ' RESET ' ) )
def next ( self , advance_block = False ) :
v = self . sync_state . state ( self . state_key )
if v == self . sync_state . DONE :
raise SyncDone ( self . target )
elif v == self . sync_state . NEW :
self . sync_state . next ( self . state_key )
v = self . sync_state . get ( self . state_key )
( block_number , tx_index , target ) = sync_state_deserialize ( v )
if advance_block :
block_number + = 1
tx_index = 0
if self . target > = 0 and block_number > self . target :
self . sync_state . move ( self . state_key , self . sync_state . DONE )
raise SyncDone ( self . target )
else :
tx_index + = 1
self . cursor = block_number
self . tx_cursor = tx_index
b = sync_state_serialize ( block_number , tx_index , target )
self . sync_state . replace ( self . state_key , b )
2022-04-28 10:15:04 +02:00
def advance ( self , ignore_lock = False ) :
2022-04-20 13:59:12 +02:00
if self . skip_filter :
raise FilterDone ( )
self . __check_done ( )
if self . filter_state . state ( self . state_key ) & self . filter_state . from_name ( ' LOCK ' ) > 0 :
2022-04-28 10:15:04 +02:00
if ignore_lock :
self . filter_state . unset ( self . state_key , self . filter_state . from_name ( ' LOCK ' ) )
else :
raise LockError ( ' advance attempt on {} when state locked ' . format ( self . state_key ) )
2022-04-20 13:59:12 +02:00
done = False
try :
self . filter_state . next ( self . state_key )
except StateInvalid :
done = True
if done :
raise FilterDone ( )
self . filter_state . set ( self . state_key , self . filter_state . from_name ( ' LOCK ' ) )
def release ( self , interrupt = False ) :
if self . skip_filter :
return False
2022-04-25 08:28:42 +02:00
if interrupt == True :
2022-04-20 13:59:12 +02:00
self . filter_state . unset ( self . state_key , self . filter_state . from_name ( ' LOCK ' ) )
self . filter_state . set ( self . state_key , self . filter_state . from_name ( ' INTERRUPT ' ) )
self . filter_state . set ( self . state_key , self . filter_state . from_name ( ' DONE ' ) )
return False
state = self . filter_state . state ( self . state_key )
if state & self . filter_state . from_name ( ' LOCK ' ) == 0 :
raise LockError ( ' release attempt on {} when state unlocked ' . format ( self . state_key ) )
self . filter_state . unset ( self . state_key , self . filter_state . from_name ( ' LOCK ' ) )
try :
c = self . filter_state . peek ( self . state_key )
logg . debug ( ' peeked {} ' . format ( c ) )
except StateInvalid :
self . filter_state . set ( self . state_key , self . filter_state . from_name ( ' DONE ' ) )
return False
return True
def __str__ ( self ) :
return ' syncitem offset {} target {} cursor {} ' . format ( self . offset , self . target , self . cursor )
class SyncStore :
2022-04-20 16:27:59 +02:00
def __init__ ( self , path , session_id = None ) :
2022-04-28 08:10:43 +02:00
self . session_id = session_id
2022-04-20 13:59:12 +02:00
self . session_path = None
self . is_default = False
self . first = False
self . target = None
self . items = { }
self . item_keys = [ ]
self . started = False
2022-04-20 14:55:43 +02:00
self . thresholds = [ ]
2022-04-28 08:10:43 +02:00
self . session_path = path
2022-04-20 13:59:12 +02:00
2022-04-26 09:56:04 +02:00
def setup_sync_state ( self , factory = None , event_callback = None ) :
if factory == None :
self . state = State ( 2 , event_callback = event_callback )
else :
self . state = PersistedState ( factory . add , 2 , event_callback = event_callback )
2022-04-20 14:31:32 +02:00
self . state . add ( ' SYNC ' )
self . state . add ( ' DONE ' )
2022-04-26 09:56:04 +02:00
def setup_filter_state ( self , factory = None , event_callback = None ) :
if factory == None :
filter_state_backend = State ( 0 , check_alias = False , event_callback = event_callback )
self . filter_state = FilterState ( filter_state_backend )
else :
filter_state_backend = PersistedState ( factory . add , 0 , check_alias = False , event_callback = event_callback )
self . filter_state = FilterState ( filter_state_backend , scan = factory . ls )
2022-04-20 14:31:32 +02:00
self . filters = [ ]
2022-04-20 14:55:43 +02:00
def set_target ( self , v ) :
pass
def get_target ( self ) :
return None
2022-04-20 13:59:12 +02:00
def register ( self , fltr ) :
self . filters . append ( fltr )
self . filter_state . register ( fltr )
2022-04-28 10:15:04 +02:00
def start ( self , offset = 0 , target = - 1 , ignore_lock = False ) :
2022-04-20 13:59:12 +02:00
if self . started :
return
2022-04-28 08:45:59 +02:00
self . save_filter_list ( )
2022-04-28 10:15:04 +02:00
self . load ( target , ignore_lock = ignore_lock )
2022-04-20 13:59:12 +02:00
if self . first :
state_bytes = sync_state_serialize ( offset , 0 , target )
block_number_str = str ( offset )
2022-04-26 09:56:04 +02:00
self . state . put ( block_number_str , contents = state_bytes )
2022-04-20 13:59:12 +02:00
self . filter_state . put ( block_number_str )
2022-04-28 10:15:04 +02:00
o = SyncItem ( offset , target , self . state , self . filter_state , ignore_lock = ignore_lock )
2022-04-30 09:35:08 +02:00
k = str ( offset )
self . items [ k ] = o
self . item_keys . append ( k )
2022-04-20 13:59:12 +02:00
elif offset > 0 :
logg . warning ( ' block number argument {} for start ignored for already initiated sync {} ' . format ( offset , self . session_id ) )
self . started = True
self . item_keys . sort ( )
def stop ( self , item ) :
if item . target == - 1 :
state_bytes = sync_state_serialize ( item . cursor , 0 , item . cursor )
self . state . replace ( str ( item . offset ) , state_bytes )
self . filter_state . put ( str ( item . cursor ) )
SyncItem ( item . offset , - 1 , self . state , self . filter_state )
logg . info ( ' New sync state start at block number {} for next head sync backfill ' . format ( item . cursor ) )
self . state . move ( item . state_key , self . state . DONE )
state_bytes = sync_state_serialize ( item . cursor , 0 , - 1 )
2022-04-26 09:56:04 +02:00
self . state . put ( str ( item . cursor ) , contents = state_bytes )
2022-04-20 13:59:12 +02:00
2022-04-28 10:15:04 +02:00
def load ( self , target , ignore_lock = False ) :
2022-04-20 14:55:43 +02:00
self . state . sync ( self . state . NEW )
self . state . sync ( self . state . SYNC )
thresholds_sync = [ ]
for v in self . state . list ( self . state . SYNC ) :
block_number = int ( v )
thresholds_sync . append ( block_number )
logg . debug ( ' queue resume {} ' . format ( block_number ) )
thresholds_new = [ ]
for v in self . state . list ( self . state . NEW ) :
block_number = int ( v )
thresholds_new . append ( block_number )
logg . debug ( ' queue new range {} ' . format ( block_number ) )
thresholds_sync . sort ( )
thresholds_new . sort ( )
thresholds = thresholds_sync + thresholds_new
lim = len ( thresholds ) - 1
for i in range ( len ( thresholds ) ) :
item_target = target
if i < lim :
item_target = thresholds [ i + 1 ]
2022-04-28 10:15:04 +02:00
o = SyncItem ( block_number , item_target , self . state , self . filter_state , started = True , ignore_lock = ignore_lock )
2022-04-30 09:35:08 +02:00
k = str ( block_number )
self . items [ k ] = o
self . item_keys . append ( k )
2022-04-20 14:55:43 +02:00
logg . info ( ' added existing {} ' . format ( o ) )
2022-05-05 16:49:34 +02:00
v = self . get_target ( )
if v != None :
target = v
2022-04-20 14:55:43 +02:00
if len ( thresholds ) == 0 :
if self . target != None :
logg . warning ( ' sync " {} " is already done, nothing to do' . format ( self . session_id ) )
else :
logg . info ( ' syncer first run target {} ' . format ( target ) )
self . first = True
self . set_target ( target )
2022-04-20 13:59:12 +02:00
def get ( self , k ) :
return self . items [ k ]
def next_item ( self ) :
try :
k = self . item_keys . pop ( 0 )
except IndexError :
return None
return self . items [ k ]
def connect ( self ) :
self . filter_state . connect ( )
def disconnect ( self ) :
self . filter_state . disconnect ( )
2022-04-28 10:15:04 +02:00
def save_filter_list ( self ) :
raise NotImplementedError ( )
def load_filter_list ( self ) :
raise NotImplementedError ( )
2022-04-30 09:35:08 +02:00
def __get_locked_item ( self ) :
locked_item = self . filter_state . list ( self . filter_state . state_store . LOCK )
if len ( locked_item ) == 0 :
logg . error ( ' Sync filter in store {} is not locked \n ' . format ( self ) )
return None
elif len ( locked_item ) > 1 :
raise FilterInitializationError ( ' More than one locked filter item encountered in store {} . That should never happen, so I do not know what to do next. \n ' . format ( self ) )
return locked_item [ 0 ]
def __get_filter_index ( self , k ) :
i = - 1
fltrs = self . load_filter_list ( )
for fltr in fltrs :
i + = 1
if k == fltr . upper ( ) :
logg . debug ( ' lock filter match at filter list index {} ' . format ( i ) )
return ( i , fltrs , )
def unlock_filter ( self , revert = False ) :
locked_item_key = self . __get_locked_item ( )
if locked_item_key == None :
return False
locked_item = self . get ( locked_item_key )
2022-05-07 13:26:53 +02:00
state = self . filter_state . state ( locked_item_key )
locked_state = state - self . filter_state . state_store . LOCK
2022-04-30 09:35:08 +02:00
locked_state_name = self . filter_state . name ( locked_state )
logg . debug ( ' found locked item {} in state {} ' . format ( locked_item , locked_state ) )
( i , fltrs ) = self . __get_filter_index ( locked_state_name )
if i == - 1 :
raise FilterInitializationError ( ' locked state {} ( {} ) found for item {} , but matching filter has not been registered ' . format ( locked_state_name , locked_state , locked_item ) )
2022-05-07 13:26:53 +02:00
direction = None
2022-04-30 09:35:08 +02:00
if revert :
self . __unlock_previous ( locked_item , fltrs , i )
2022-05-07 13:26:53 +02:00
new_state = self . filter_state . state ( locked_item_key )
direction = ' previous '
2022-04-30 09:35:08 +02:00
else :
self . __unlock_next ( locked_item , fltrs , i )
2022-05-07 13:26:53 +02:00
new_state = self . filter_state . state ( locked_item_key )
direction = ' next '
logg . info ( ' chainstate unlock to {} {} ( {} ) -> {} ( {} ) ' . format ( direction , self . filter_state . name ( state ) , state , self . filter_state . name ( new_state ) , new_state ) )
2022-04-30 09:35:08 +02:00
return True
def __unlock_next ( self , item , lst , index ) :
if index == len ( lst ) - 1 :
item . reset ( check_incomplete = False )
else :
item . release ( )
def __unlock_previous ( self , item , lst , index ) :
if index == 0 :
item . reset ( check_incomplete = False )
else :
2022-04-30 09:49:10 +02:00
new_state_str = lst [ index - 1 ]
new_state = self . filter_state . state_store . from_name ( new_state_str )
self . filter_state . state_store . move ( item . state_key , new_state )
2022-04-30 09:35:08 +02:00
2022-04-28 10:15:04 +02:00
def peek_current_filter ( self ) :
pass