WIP fix filter handling in file backend
This commit is contained in:
parent
6201420ad2
commit
6a94e28ad8
@ -156,20 +156,23 @@ class SyncerFileBackend:
|
|||||||
|
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
return ((self.block_height_cursor, self.tx_index_cursor), self.filter)
|
logg.debug('filter {}'.format(self.filter.hex()))
|
||||||
|
return ((self.block_height_cursor, self.tx_index_cursor), int.from_bytes(self.filter, 'little'))
|
||||||
|
|
||||||
|
|
||||||
def set(self, block_height, tx_index):
|
def set(self, block_height, tx_index):
|
||||||
self.__set(block_height, tx_index, 'cursor')
|
self.__set(block_height, tx_index, 'cursor')
|
||||||
|
|
||||||
cursor_path = os.path.join(self.object_data_dir, 'filter')
|
# cursor_path = os.path.join(self.object_data_dir, 'filter')
|
||||||
f = open(cursor_path, 'r+b')
|
# f = open(cursor_path, 'r+b')
|
||||||
f.seek(8)
|
# f.seek(8)
|
||||||
l = len(self.filter)
|
# l = len(self.filter)
|
||||||
c = 0
|
# c = 0
|
||||||
while c < l:
|
# while c < l:
|
||||||
c += f.write(self.filter)
|
# c += f.write(self.filter[c:])
|
||||||
f.close()
|
# f.close()
|
||||||
|
|
||||||
|
return ((self.block_height_cursor, self.tx_index_cursor), int.from_bytes(self.filter, 'little'))
|
||||||
|
|
||||||
|
|
||||||
def __set(self, block_height, tx_index, category):
|
def __set(self, block_height, tx_index, category):
|
||||||
@ -198,15 +201,16 @@ class SyncerFileBackend:
|
|||||||
o.__set(target_block_height, 0, 'target')
|
o.__set(target_block_height, 0, 'target')
|
||||||
o.__set(start_block_height, 0, 'offset')
|
o.__set(start_block_height, 0, 'offset')
|
||||||
|
|
||||||
return uu
|
#return uu
|
||||||
|
return o
|
||||||
|
|
||||||
|
|
||||||
def target(self):
|
def target(self):
|
||||||
return ((self.block_height_target, self.tx_index_target), None,)
|
return (self.block_height_target, 0,)
|
||||||
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
return ((self.block_height_offset, self.tx_index_offset), None,)
|
return ((self.block_height_offset, self.tx_index_offset), 0,)
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -289,3 +293,15 @@ class SyncerFileBackend:
|
|||||||
b = self.filter_count.to_bytes(8, byteorder='big')
|
b = self.filter_count.to_bytes(8, byteorder='big')
|
||||||
f.write(b)
|
f.write(b)
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
|
|
||||||
|
def reset_filter(self):
|
||||||
|
self.filter = b'\x00' * len(self.filter)
|
||||||
|
cursor_path = os.path.join(self.object_data_dir, 'filter')
|
||||||
|
f = open(cursor_path, 'r+b')
|
||||||
|
f.seek(8)
|
||||||
|
l = len(self.filter)
|
||||||
|
c = 0
|
||||||
|
while c < l:
|
||||||
|
c += f.write(self.filter[c:])
|
||||||
|
f.close()
|
||||||
|
@ -170,6 +170,7 @@ class HistorySyncer(HeadSyncer):
|
|||||||
if block_number == None:
|
if block_number == None:
|
||||||
raise AttributeError('backend has no future target. Use HeadSyner instead')
|
raise AttributeError('backend has no future target. Use HeadSyner instead')
|
||||||
self.block_target = block_number
|
self.block_target = block_number
|
||||||
|
logg.debug('block target {}'.format(self.block_target))
|
||||||
|
|
||||||
|
|
||||||
def get(self, conn):
|
def get(self, conn):
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
import logging
|
import logging
|
||||||
import unittest
|
import unittest
|
||||||
import os
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
# external imports
|
# external imports
|
||||||
from chainlib.chain import ChainSpec
|
from chainlib.chain import ChainSpec
|
||||||
@ -9,6 +10,10 @@ from chainlib.chain import ChainSpec
|
|||||||
# local imports
|
# local imports
|
||||||
from chainsyncer.backend.memory import MemBackend
|
from chainsyncer.backend.memory import MemBackend
|
||||||
from chainsyncer.backend.sql import SyncerBackend
|
from chainsyncer.backend.sql import SyncerBackend
|
||||||
|
from chainsyncer.backend.file import (
|
||||||
|
SyncerFileBackend,
|
||||||
|
data_dir_for,
|
||||||
|
)
|
||||||
|
|
||||||
# test imports
|
# test imports
|
||||||
from tests.base import TestBase
|
from tests.base import TestBase
|
||||||
@ -61,21 +66,35 @@ class TestInterrupt(TestBase):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestInterrupt, self).setUp()
|
super(TestInterrupt, self).setUp()
|
||||||
self.filters = [
|
|
||||||
|
self.backend = None
|
||||||
|
self.conn = MockConn()
|
||||||
|
self.vectors = [
|
||||||
|
[4, 3, 2],
|
||||||
|
[6, 4, 2],
|
||||||
|
[6, 5, 2],
|
||||||
|
[6, 4, 3],
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def assert_filter_interrupt(self, vector):
|
||||||
|
|
||||||
|
logg.debug('running vector {} {}'.format(str(self.backend), vector))
|
||||||
|
|
||||||
|
z = 0
|
||||||
|
for v in vector:
|
||||||
|
z += v
|
||||||
|
|
||||||
|
syncer = TestSyncer(self.backend, vector)
|
||||||
|
|
||||||
|
filters = [
|
||||||
CountFilter('foo'),
|
CountFilter('foo'),
|
||||||
CountFilter('bar'),
|
CountFilter('bar'),
|
||||||
NaughtyCountExceptionFilter('xyzzy', croak_on=3),
|
NaughtyCountExceptionFilter('xyzzy', croak_on=3),
|
||||||
CountFilter('baz'),
|
CountFilter('baz'),
|
||||||
]
|
]
|
||||||
self.backend = None
|
|
||||||
self.conn = MockConn()
|
|
||||||
|
|
||||||
|
for fltr in filters:
|
||||||
def assert_filter_interrupt(self):
|
|
||||||
|
|
||||||
syncer = TestSyncer(self.backend, [4, 3, 2])
|
|
||||||
|
|
||||||
for fltr in self.filters:
|
|
||||||
syncer.add_filter(fltr)
|
syncer.add_filter(fltr)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -87,19 +106,33 @@ class TestInterrupt(TestBase):
|
|||||||
self.assertGreater(fltr, 0)
|
self.assertGreater(fltr, 0)
|
||||||
syncer.loop(0.1, self.conn)
|
syncer.loop(0.1, self.conn)
|
||||||
|
|
||||||
for fltr in self.filters:
|
for fltr in filters:
|
||||||
logg.debug('{} {}'.format(str(fltr), fltr.c))
|
logg.debug('{} {}'.format(str(fltr), fltr.c))
|
||||||
self.assertEqual(fltr.c, 9)
|
self.assertEqual(fltr.c, z)
|
||||||
|
|
||||||
|
|
||||||
|
@unittest.skip('foo')
|
||||||
def test_filter_interrupt_memory(self):
|
def test_filter_interrupt_memory(self):
|
||||||
self.backend = MemBackend(self.chain_spec, None, target_block=4)
|
for vector in self.vectors:
|
||||||
self.assert_filter_interrupt()
|
self.backend = MemBackend(self.chain_spec, None, target_block=len(vector))
|
||||||
|
self.assert_filter_interrupt(vector)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_interrpt_file(self):
|
||||||
|
for vector in self.vectors:
|
||||||
|
d = tempfile.mkdtemp()
|
||||||
|
#os.makedirs(data_dir_for(self.chain_spec, 'foo', d))
|
||||||
|
self.backend = SyncerFileBackend.initial(self.chain_spec, len(vector), base_dir=d) #'foo', base_dir=d)
|
||||||
|
self.assert_filter_interrupt(vector)
|
||||||
|
|
||||||
|
|
||||||
|
@unittest.skip('foo')
|
||||||
def test_filter_interrupt_sql(self):
|
def test_filter_interrupt_sql(self):
|
||||||
self.backend = SyncerBackend.initial(self.chain_spec, 4)
|
for vector in self.vectors:
|
||||||
self.assert_filter_interrupt()
|
self.backend = SyncerBackend.initial(self.chain_spec, len(vector))
|
||||||
|
self.assert_filter_interrupt(vector)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
Loading…
Reference in New Issue
Block a user