102 lines
3.0 KiB
Python
102 lines
3.0 KiB
Python
from __future__ import annotations
|
|
|
|
# standard imports
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import TYPE_CHECKING, Dict, Union
|
|
|
|
from cic_types.condiments import MetadataPointer
|
|
from cic_types.ext.metadata.signer import Signer
|
|
from cic_types.processor import generate_metadata_pointer
|
|
|
|
if TYPE_CHECKING:
|
|
from cic.cmd.arg import CmdCtrl
|
|
from cic.http import HTTPSession
|
|
|
|
# local imports
|
|
|
|
# external imports
|
|
|
|
|
|
logg = logging.getLogger(__file__)
|
|
|
|
|
|
class Metadata:
|
|
"""
|
|
:cvar base_url: The base url or the metadata server.
|
|
:type base_url: str
|
|
"""
|
|
|
|
base_url = None
|
|
ctrl: CmdCtrl = None
|
|
|
|
class MetadataRequestsHandler(Metadata):
|
|
def __init__(
|
|
self,
|
|
cic_type: MetadataPointer,
|
|
identifier: bytes,
|
|
engine: str = "pgp",
|
|
):
|
|
""""""
|
|
logg.debug(f"ctrl: {self.ctrl}")
|
|
self.opener: HTTPSession = self.ctrl.remote_openers["meta"]
|
|
self.cic_type = cic_type
|
|
self.engine = engine
|
|
self.headers = {"X-CIC-AUTOMERGE": "server", "Content-Type": "application/json"}
|
|
self.identifier = identifier
|
|
if cic_type == MetadataPointer.NONE:
|
|
self.metadata_pointer = identifier.hex()
|
|
else:
|
|
self.metadata_pointer = generate_metadata_pointer(
|
|
identifier=self.identifier, cic_type=self.cic_type
|
|
)
|
|
if self.base_url:
|
|
self.url = os.path.join(self.base_url, self.metadata_pointer)
|
|
|
|
def create(self, data: Union[Dict, str]):
|
|
""""""
|
|
data = json.dumps(data).encode("utf-8")
|
|
|
|
result = self.opener.open(
|
|
method="POST", url=self.url, data=data, headers=self.headers
|
|
)
|
|
logg.debug(
|
|
f"url: {self.url}, data: {data}, headers: {self.headers}, result: {result}"
|
|
)
|
|
metadata = json.loads(result)
|
|
return self.edit(data=metadata)
|
|
|
|
def edit(self, data: Union[Dict, str]):
|
|
""""""
|
|
cic_meta_signer = Signer()
|
|
signature = cic_meta_signer.sign_digest(data=data)
|
|
algorithm = cic_meta_signer.get_operational_key().get("algo")
|
|
formatted_data = {
|
|
"m": json.dumps(data),
|
|
"s": {
|
|
"engine": self.engine,
|
|
"algo": algorithm,
|
|
"data": signature,
|
|
"digest": data.get("digest"),
|
|
},
|
|
}
|
|
formatted_data = json.dumps(formatted_data).encode("utf-8")
|
|
result = self.opener.open(
|
|
method="PUT", url=self.url, data=formatted_data, headers=self.headers
|
|
)
|
|
logg.info(f"signed metadata submission returned: {result}.")
|
|
try:
|
|
decoded_identifier = self.identifier.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
decoded_identifier = self.identifier.hex()
|
|
return result
|
|
|
|
def query(self):
|
|
""""""
|
|
result = self.opener.open(method="GET", url=self.url)
|
|
result_data = json.loads(result)
|
|
if not isinstance(result_data, dict):
|
|
raise ValueError(f"invalid result data object: {result_data}.")
|
|
return result
|