# standard imports import hashlib import urllib.parse import os import logging from socket import getservbyname # external imports from usumbufu.client.base import ( ClientSession, BaseTokenStore, ) from usumbufu.client.bearer import BearerClientSession from usumbufu.client.hoba import HobaClientSession from urlybird.host import url_apply_port_string logg = logging.getLogger(__name__) class PGPClientSession(HobaClientSession): alg = '969' def __init__(self, auth): super(PGPClientSession, self).__init__() self.auth = auth self.origin = None self.fingerprint = self.auth.fingerprint() def sign_auth_challenge(self, plaintext, hoba, encoding): passphrase = self.auth.get_passphrase() r = self.auth.sign(plaintext, encoding, passphrase=passphrase, detach=True) hoba.signature = r return str(hoba) def __str__(self): return 'clicada hoba/pgp auth' def __repr__(self): return 'clicada hoba/pgp auth' class HTTPSession: token_dir = '/run/user/{}/clicada/usumbufu/.token'.format(os.getuid()) def __init__(self, url, auth=None, origin=None, ssl_context=None): self.base_url = url if origin == None: origin = url_apply_port_string(url, as_origin=True) self.origin = origin h = hashlib.sha256() h.update(self.base_url.encode('utf-8')) z = h.digest() token_store_dir = os.path.join(self.token_dir, z.hex()) os.makedirs(token_store_dir, exist_ok=True) self.token_store = BaseTokenStore(path=token_store_dir) self.session = ClientSession(self.origin, token_store=self.token_store, ssl_context=ssl_context) bearer_handler = BearerClientSession(self.origin, token_store=self.token_store) self.session.add_subhandler(bearer_handler) if auth != None: auth.origin = self.origin self.session.add_subhandler(auth) self.opener = urllib.request.build_opener(self.session) def open(self, endpoint): url = urllib.parse.urljoin(self.base_url, endpoint) logg.debug('open {} with opener {}'.format(url, self)) r = self.opener.open(url) logg.debug('response code {} for {}'.format(r.code, endpoint)) if r.code == 404: raise FileNotFoundError() return r.read().decode('utf-8') def __str__(self): return str(self.session)