# standard imports import hashlib import logging import os import ssl import urllib.parse from http.client import HTTPResponse from socket import getservbyname from urllib.request import HTTPSHandler # external imports from usumbufu.client.base import BaseTokenStore, ClientSession from usumbufu.client.bearer import BearerClientSession from usumbufu.client.hoba import HobaClientSession logg = logging.getLogger(__name__) class PGPClientSession(HobaClientSession): alg = "969" def __init__(self, auth): self.auth = auth self.origin = None self.fingerprint = self.auth.fingerprint() def sign_auth_challenge(self, plaintext, hoba, encoding): passphrase = self.auth.get_passphrase() r = self.auth.sign(plaintext, encoding, passphrase=passphrase, detach=True) hoba.signature = r return str(hoba) def __str__(self): return "clicada hoba/pgp auth" def __repr__(self): return "clicada hoba/pgp auth" class HTTPSession: token_dir = f"/run/user/{os.getuid()}/clicada/usumbufu/.token" def __init__(self, url, auth=None, origin=None): self.base_url = url url_parts = urllib.parse.urlsplit(self.base_url) url_parts_origin_host = url_parts[1].split(":") host = url_parts_origin_host[0] try: host = host + ":" + url_parts_origin_host[1] except IndexError: host = host + ":" + str(getservbyname(url_parts[0])) logg.info( f"changed origin with missing port number from {url_parts[1]} to {host}" ) url_parts_origin = ( url_parts[0], host, "", "", "", ) self.origin = origin if self.origin is None: self.origin = urllib.parse.urlunsplit(url_parts_origin) else: logg.debug(f"overriding http origin for {url} with {self.origin}") h = hashlib.sha256() h.update(self.base_url.encode("utf-8")) z = h.digest() token_store_dir = os.path.join(self.token_dir, z.hex()) os.makedirs(token_store_dir, exist_ok=True) self.token_store = BaseTokenStore(path=token_store_dir) logg.debug( f"token store: \n{self.token_store}\n origin: {self.origin}\n token_store_dir: {token_store_dir}\n" ) self.session = ClientSession(self.origin, token_store=self.token_store) bearer_handler = BearerClientSession(self.origin, token_store=self.token_store) self.session.add_subhandler(bearer_handler) if auth is not None: auth.origin = self.origin self.session.add_subhandler(auth) ctx = ssl.create_default_context() ctx.load_verify_locations( capath="/home/will/grassroots/cic-staff-installer/keys/ge.ca" ) https_handler = HTTPSHandler(context=ctx) self.session.add_parent(parent=https_handler) self.opener = urllib.request.build_opener(self.session) def open(self, url, method=None, data: bytes = None, headers=None): logg.debug(f"headers: {headers}") logg.debug(f"token store: \n{self.token_store}\n origin: {self.origin}") req = urllib.request.Request(url=url, data=data, headers=headers, method=method) logg.debug(f"open {url} with opener {self}") logg.debug(req.get_full_url()) logg.debug(f"handlers {self.opener.handlers}") response: HTTPResponse = self.opener.open(req) status = response.getcode() logg.debug(f"{url} returned {status}") return response.read().decode("utf-8") def __str__(self): return str(self.session)