cic-auth-helper/cic_auth_helper/runnable/server.py

134 lines
5.3 KiB
Python
Raw Normal View History

2021-10-26 12:36:56 +02:00
# standard imports
import logging
import os
import datetime
import base64
import sys
import argparse
# external imports
from http_hoba_auth import hoba_auth_request_string
from http_token_auth import SessionStore
import confini
# local imports
from usumbufu.challenge import Challenger
#from usumbufu.filter.sha256 import SHA256Filter
from usumbufu.filter.hoba import HobaFilter
from usumbufu.filter.fetcher import FetcherFilter
from usumbufu.filter.pgp import PGPFilter
from usumbufu.filter.session import SessionFilter
from usumbufu.filter import Filter
from usumbufu.retrieve import Retriever
from usumbufu.retrieve.file import FileFetcher
from usumbufu.adapters.uwsgi import UWSGIHTTPAuthorization
from usumbufu.adapters.uwsgi import UWSGIAdapter
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, '..', 'data')
config_dir = os.path.join(data_dir, 'config')
argparser = argparse.ArgumentParser('Authentication helper for ingress routers')
argparser.add_argument('-c', type=str, help='configuration override directory')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args(sys.argv[1:])
if args.vv:
logg.setLevel(logging.DEBUG)
elif args.v:
logg.setLevel(logging.INFO)
config = confini.Config(config_dir, override_dirs=args.c)
config.process()
logg.debug('config loaded:\n' + str(config))
# The Retriever is at the heart of the setup.
# It will run all the decoding steps from all the filters, ultimately resolving to a auth resource (typically ACL) and an identity
challenger = Challenger()
# parse a hoba auth string to key, signature, nonce etc, to the "to be signed" format
hoba_filter = HobaFilter(config.get('HTTP_AUTH_ORIGIN'), config.get('HTTP_AUTH_REALM'), challenger, alg=config.get('HTTP_HOBA_CIPHER_ID'))
trusted_publickeys = config.get('PGP_TRUSTED_PUBLICKEY_FINGERPRINT').split(',')
fetcher_pgp_trusted = FileFetcher(config.get('PGP_IMPORT_DIR'))
pgp_filter = PGPFilter(trusted_publickeys, fetcher_pgp_trusted)
pgp_filter.import_keys(config.get('PGP_PUBLICKEY_FILENAME'), config.get('PGP_SIGNATURE_FILENAME'))
# set a session token for the identity, if not yet set
# the session store can be used outside the pipeline to get session info (like the auth token for a Token header response)
session_store = SessionStore(auth_expire_delta=1)
session_filter = SessionFilter(session_store)
# the decoders will be run in SEQUENCE. let's wire them up, like so:
hoba_retriever = Retriever()
hoba_retriever.add_decoder(hoba_filter)
hoba_retriever.add_decoder(pgp_filter)
hoba_retriever.add_decoder(session_filter)
bearer_retriever = Retriever()
bearer_retriever.add_decoder(session_filter)
# Below here is the runtime code for the UWSGI application
# Most important to notice here is that the ChallengeRetriever is being passed to the UWSGIHTTPAuthorization object. This object can identify a HOBA request, and will attempt to validate the HOBA auth string using ChallengeRetriever.
# cic_eth.Auth.check() (overloaded) will attempt to FETCH the ACL using the key (if any) resulting from the validation
def do_auth(env):
authenticator = UWSGIAdapter()
http_authenticator = UWSGIHTTPAuthorization(hoba_retriever, env, config.get('HTTP_AUTH_REALM'), origin=config.get('HTTP_AUTH_ORIGIN'))
bearer_authenticator = UWSGIHTTPAuthorization(bearer_retriever, env, config.get('HTTP_AUTH_REALM'), origin=config.get('HTTP_AUTH_ORIGIN'))
http_authenticator.component_id = 'http-hoba'
bearer_authenticator.component_id = 'http-bearer'
try:
authenticator.register(bearer_authenticator)
authenticator.activate(bearer_authenticator.component_id)
except TypeError as e:
logg.debug('not a http bearer request: {}'.format(e))
try:
authenticator.register(http_authenticator)
authenticator.activate(http_authenticator.component_id)
except TypeError as e:
logg.debug('not a http hoba request: {}'.format(e))
return authenticator.check()
# And to conclude, vanilla UWSGI stuff
def application(env, start_response):
headers = []
print('env {}'.format(env))
result = do_auth(env)
if result == None:
#if env.get('HTTP_AUTHORIZATION') != None:
# start_response('403 failed miserably', headers)
# return [b'']
(challenge, expire) = challenger.request(env['REMOTE_ADDR'])
headers.append(('WWW-Authenticate', hoba_auth_request_string(challenge, expire.timestamp(), realm=config.get('HTTP_AUTH_REALM'))),)
start_response('401 authenticate or I will SCREAM_SNAKE_CASE at you', headers)
return [b'']
# name the successful auth result parts
auth_method_component_id = result[0]
auth_string = result[1]
auth_resource = result[2]
auth_identity = result[3]
logg.debug('result {}'.format(result))
session = session_store.get(auth_identity)
logg.debug(f'session token: {session.auth}')
if auth_method_component_id == 'http-hoba':
headers.append(('Token', base64.b64encode(session.auth).decode('utf-8'),))
elif session.auth != auth_string:
headers.append(('Token', base64.b64encode(session.auth).decode('utf-8'),))
start_response('200 OK', headers)
return [str(auth_resource).encode('utf-8')]