cic-auth-helper/cic_auth_helper/runnable/server.py

184 lines
7.3 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import logging
import os
import datetime
import base64
import sys
import argparse
import urllib.parse
# external imports
from http_hoba_auth import hoba_auth_request_string
from http_token_auth import SessionStore
import confini
from usumbufu.challenge import Challenger
#from usumbufu.filter.sha256 import SHA256Filter
from usumbufu.filter.hoba import HobaFilter
from usumbufu.filter.fetcher import FetcherFilter
from usumbufu.filter.pgp import PGPFilter
from usumbufu.filter.session import SessionFilter
from usumbufu.filter import Filter
from usumbufu.retrieve import Retriever
from usumbufu.retrieve.file import FileFetcher
from usumbufu.adapters.uwsgi import UWSGIHTTPAuthorization
from usumbufu.adapters.uwsgi import UWSGIAdapter
# local imports
from cic_auth_helper.error import (
NotFoundError,
ReverseProxyError,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, '..', 'data')
config_dir = os.path.join(data_dir, 'config')
argparser = argparse.ArgumentParser('Authentication helper for ingress routers')
argparser.add_argument('--forward-to', type=str, dest='forward_to', help='server to forward request to')
argparser.add_argument('-c', type=str, help='configuration override directory')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args(sys.argv[1:])
if args.vv:
logg.setLevel(logging.DEBUG)
elif args.v:
logg.setLevel(logging.INFO)
config = confini.Config(config_dir, args.env_prefix, override_dirs=args.c)
config.process()
logg.debug('config loaded:\n' + str(config))
reverse_proxy = None
if args.forward_to:
from cic_auth_helper.proxy import ReverseProxy
forward_to = urllib.parse.urlsplit(args.forward_to)
forward_to = urllib.parse.urlunsplit(forward_to)
reverse_proxy = ReverseProxy(forward_to, ignore_proxy_headers=[
'access-control-allow-origin',
])
logg.info('will forward requests to {}'.format(forward_to))
# The Retriever is at the heart of the setup.
# It will run all the decoding steps from all the filters, ultimately resolving to a auth resource (typically ACL) and an identity
challenger = Challenger()
# parse a hoba auth string to key, signature, nonce etc, to the "to be signed" format
hoba_filter = HobaFilter(config.get('HTTP_AUTH_ORIGIN'), config.get('HTTP_AUTH_REALM'), challenger, alg=config.get('HTTP_HOBA_CIPHER_ID'))
trusted_publickeys = config.get('PGP_TRUSTED_PUBLICKEY_FINGERPRINT').split(',')
fetcher_pgp_trusted = FileFetcher(config.get('PGP_IMPORT_DIR'))
pgp_filter = PGPFilter(trusted_publickeys, fetcher_pgp_trusted)
pgp_filter.import_keys(config.get('PGP_PUBLICKEY_FILENAME'), config.get('PGP_SIGNATURE_FILENAME'))
# set a session token for the identity, if not yet set
# the session store can be used outside the pipeline to get session info (like the auth token for a Token header response)
session_store = SessionStore(auth_expire_delta=1)
session_filter = SessionFilter(session_store)
# the decoders will be run in SEQUENCE. let's wire them up, like so:
hoba_retriever = Retriever()
hoba_retriever.add_decoder(hoba_filter)
hoba_retriever.add_decoder(pgp_filter)
hoba_retriever.add_decoder(session_filter)
bearer_retriever = Retriever()
bearer_retriever.add_decoder(session_filter)
# Below here is the runtime code for the UWSGI application
# Most important to notice here is that the ChallengeRetriever is being passed to the UWSGIHTTPAuthorization object. This object can identify a HOBA request, and will attempt to validate the HOBA auth string using ChallengeRetriever.
# cic_eth.Auth.check() (overloaded) will attempt to FETCH the ACL using the key (if any) resulting from the validation
def do_auth(env):
authenticator = UWSGIAdapter()
http_authenticator = UWSGIHTTPAuthorization(hoba_retriever, env, config.get('HTTP_AUTH_REALM'), origin=config.get('HTTP_AUTH_ORIGIN'))
bearer_authenticator = UWSGIHTTPAuthorization(bearer_retriever, env, config.get('HTTP_AUTH_REALM'), origin=config.get('HTTP_AUTH_ORIGIN'))
http_authenticator.component_id = 'http-hoba'
bearer_authenticator.component_id = 'http-bearer'
try:
logg.debug('registering bearer authenticator with bearer retriever')
authenticator.register(bearer_authenticator)
authenticator.activate(bearer_authenticator.component_id)
except TypeError as e:
logg.debug('not a http bearer request: {}'.format(e))
try:
logg.debug('registering http authenticator with hoba retriever')
authenticator.register(http_authenticator)
authenticator.activate(http_authenticator.component_id)
except TypeError as e:
logg.debug('not a http hoba request: {}'.format(e))
return authenticator.check()
# And to conclude, vanilla UWSGI stuff
def application(env, start_response):
headers = []
# fetch control headers
headers.append(('Access-Control-Allow-Origin', config.get('HTTP_CORS_ORIGIN'),))
headers.append(('Access-Control-Expose-Headers', 'WWW-Authenticate, Token',))
headers.append(('Access-Control-Allow-Headers', '*',))
headers.append(('Access-Control-Allow-Credentials', 'true',))
# to appease fetch pre-flights
if env['REQUEST_METHOD'] == 'OPTIONS':
start_response('200 OK', headers)
return [b'']
print('env {}'.format(env))
result = do_auth(env)
if result == None:
#if env.get('HTTP_AUTHORIZATION') != None:
# start_response('403 failed miserably', headers)
# return [b'']
(challenge, expire) = challenger.request(env['REMOTE_ADDR'])
headers.append(('WWW-Authenticate', hoba_auth_request_string(challenge, expire.timestamp(), realm=config.get('HTTP_AUTH_REALM'))),)
logg.debug('headers {}'.format(headers))
start_response('401 authenticate or I will SCREAM_SNAKE_CASE at you', headers)
return [b'']
# name the successful auth result parts
auth_method_component_id = result[0]
auth_string = result[1]
auth_resource = result[2]
auth_identity = result[3]
logg.debug('result {}'.format(result))
session = session_store.get(auth_identity)
logg.debug(f'session token: {session.auth}')
if auth_method_component_id == 'http-hoba':
headers.append(('Token', base64.b64encode(session.auth).decode('utf-8'),))
elif session.auth != auth_string:
headers.append(('Token', base64.b64encode(session.auth).decode('utf-8'),))
response_status = '200 OK'
content = b''
if reverse_proxy != None:
try:
(response_status, headers, content) = reverse_proxy.proxy_pass(env, headers)
except ReverseProxyError as e:
response_status = str(e.code) + ' ' + e.msg
headers = headers + e.headers
logg.debug('headers more {}'.format(headers))
content = b''
# except BadRequestError as e:
# response_status = '400 ' + e.msg
# headers = e.headers
# content = b''
else:
content = str(auth_resource).encode('utf-8')
start_response(response_status, headers)
return [content]