clicada/clicada/cli/http.py

84 lines
2.2 KiB
Python

# standard imports
import hashlib
import urllib.parse
import os
import logging
# external imports
from usumbufu.client.base import (
ClientSession,
BaseTokenStore,
)
from usumbufu.client.bearer import BearerClientSession
from usumbufu.client.hoba import HobaClientSession
logg = logging.getLogger(__name__)
class PGPClientSession(HobaClientSession):
alg = '969'
def __init__(self, auth):
self.auth = auth
self.origin = None
self.fingerprint = self.auth.fingerprint()
def sign_auth_challenge(self, plaintext, hoba, encoding):
passphrase = self.auth.get_passphrase()
r = self.auth.sign(plaintext, encoding, passphrase=passphrase, detach=True)
hoba.signature = r
return str(hoba)
def __str__(self):
return 'clicada hoba/pgp auth'
def __repr__(self):
return 'clicada hoba/pgp auth'
class HTTPSession:
token_dir = '/run/user/{}/clicada/usumbufu/.token'.format(os.getuid())
def __init__(self, url, auth=None):
logg.debug('auth auth {}'.format(auth))
self.base_url = url
url_parts = urllib.parse.urlsplit(self.base_url)
url_parts_origin = (url_parts[0], url_parts[1], '', '', '',)
self.origin = urllib.parse.urlunsplit(url_parts_origin)
h = hashlib.sha256()
h.update(self.base_url.encode('utf-8'))
z = h.digest()
token_store_dir = os.path.join(self.token_dir, z.hex())
os.makedirs(token_store_dir, exist_ok=True)
self.token_store = BaseTokenStore(path=token_store_dir)
self.session = ClientSession(self.origin, token_store=self.token_store)
bearer_handler = BearerClientSession(self.origin, token_store=self.token_store)
self.session.add_subhandler(bearer_handler)
if auth != None:
auth.origin = self.origin
self.session.add_subhandler(auth)
self.opener = urllib.request.build_opener(self.session)
def open(self, endpoint):
url = urllib.parse.urljoin(self.base_url, endpoint)
logg.debug('open {} with opener {}'.format(url, self))
r = self.opener.open(url)
return r.read().decode('utf-8')
def __str__(self):
return str(self.session)