96 lines
2.8 KiB
Python
96 lines
2.8 KiB
Python
# standard imports
|
|
import hashlib
|
|
import urllib.parse
|
|
import os
|
|
import logging
|
|
from socket import getservbyname
|
|
|
|
# external imports
|
|
from usumbufu.client.base import (
|
|
ClientSession,
|
|
BaseTokenStore,
|
|
)
|
|
from usumbufu.client.bearer import BearerClientSession
|
|
from usumbufu.client.hoba import HobaClientSession
|
|
|
|
logg = logging.getLogger(__name__)
|
|
|
|
|
|
class PGPClientSession(HobaClientSession):
|
|
|
|
alg = '969'
|
|
|
|
def __init__(self, auth):
|
|
self.auth = auth
|
|
self.origin = None
|
|
self.fingerprint = self.auth.fingerprint()
|
|
|
|
|
|
def sign_auth_challenge(self, plaintext, hoba, encoding):
|
|
passphrase = self.auth.get_passphrase()
|
|
r = self.auth.sign(plaintext, encoding, passphrase=passphrase, detach=True)
|
|
|
|
hoba.signature = r
|
|
return str(hoba)
|
|
|
|
|
|
def __str__(self):
|
|
return 'clicada hoba/pgp auth'
|
|
|
|
|
|
def __repr__(self):
|
|
return 'clicada hoba/pgp auth'
|
|
|
|
|
|
class HTTPSession:
|
|
|
|
token_dir = '/run/user/{}/clicada/usumbufu/.token'.format(os.getuid())
|
|
|
|
def __init__(self, url, auth=None, origin=None):
|
|
self.base_url = url
|
|
url_parts = urllib.parse.urlsplit(self.base_url)
|
|
url_parts_origin_host = url_parts[1].split(":")
|
|
host = url_parts_origin_host[0]
|
|
try:
|
|
host = host + ':' + url_parts_origin_host[1]
|
|
except IndexError:
|
|
host = host + ':' + str(getservbyname(url_parts[0]))
|
|
logg.info('changed origin with missing port number from {} to {}'.format(url_parts[1], host))
|
|
url_parts_origin = (url_parts[0], host, '', '', '',)
|
|
|
|
self.origin = origin
|
|
if self.origin == None:
|
|
self.origin = urllib.parse.urlunsplit(url_parts_origin)
|
|
else:
|
|
logg.debug('overriding http origin for {} with {}'.format(url, self.origin))
|
|
|
|
h = hashlib.sha256()
|
|
h.update(self.base_url.encode('utf-8'))
|
|
z = h.digest()
|
|
|
|
token_store_dir = os.path.join(self.token_dir, z.hex())
|
|
os.makedirs(token_store_dir, exist_ok=True)
|
|
self.token_store = BaseTokenStore(path=token_store_dir)
|
|
|
|
self.session = ClientSession(self.origin, token_store=self.token_store)
|
|
|
|
bearer_handler = BearerClientSession(self.origin, token_store=self.token_store)
|
|
self.session.add_subhandler(bearer_handler)
|
|
|
|
if auth != None:
|
|
auth.origin = self.origin
|
|
self.session.add_subhandler(auth)
|
|
|
|
self.opener = urllib.request.build_opener(self.session)
|
|
|
|
|
|
def open(self, endpoint):
|
|
url = urllib.parse.urljoin(self.base_url, endpoint)
|
|
logg.debug('open {} with opener {}'.format(url, self))
|
|
r = self.opener.open(url)
|
|
return r.read().decode('utf-8')
|
|
|
|
|
|
def __str__(self):
|
|
return str(self.session)
|