# standard imports import unittest import logging import os import json import sys # external imports from hexathon import strip_0x # local imports from cic import Proof from cic.processor import Processor from cic.attachment import Attachment from cic.meta import Meta from cic.output import KVWriter # test imports from tests.base_cic import ( TestCICBase, test_data_dir, root_merged_hash, root_unmerged_hash, ) logg = logging.getLogger() logg.setLevel(logging.DEBUG) class MockExt: def __init__(self, address): self.address = address def process(self): return self.address class TestCICProcessor(TestCICBase): def test_processor_meta(self): fp = os.path.join(test_data_dir, 'proof') m = Meta(fp) m.load() mock_ext = MockExt(self.token_address) p = Processor(metadata=m, outputs_writer=self.outputs_writer, extensions=[mock_ext]) p.token_address = self.token_address p.process() meta_reference = m.reference(self.token_address) fp = os.path.join(self.outputs_dir, meta_reference) f = open(fp, 'r') o = json.load(f) f.close() self.assertEqual(m.asdict(), o) def test_processor_attachment(self): fp = os.path.join(test_data_dir, 'proof') m = Attachment(fp) m.load() mock_ext = MockExt(self.token_address) p = Processor(attachment=m, outputs_writer=self.outputs_writer, extensions=[mock_ext]) p.process() for k in list(m.contents.keys()): os.stat(fp) def test_processor_proof_noattachment(self): fp = os.path.join(test_data_dir, 'proof') m = Proof(fp) m.load() mock_ext = MockExt(self.token_address) p = Processor(proof=m, outputs_writer=self.outputs_writer, extensions=[mock_ext]) p.process() self.assertEqual(p.outputs[0], root_unmerged_hash) def test_processor_proof_attachment(self): fp = os.path.join(test_data_dir, 'proof') ma = Attachment(fp) ma.load() mp = Proof(fp, attachments=ma) mp.load() mock_ext = MockExt(self.token_address) p = Processor(proof=mp, outputs_writer=self.outputs_writer, extensions=[mock_ext]) p.process() self.assertEqual(p.outputs[0], root_merged_hash) if __name__ == '__main__': unittest.main()