Initial commit

This commit is contained in:
nolash 2021-10-26 12:36:56 +02:00
commit 66e15a8a18
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 151 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
__pycache__
*.egg-info
*.pyc
.config
build/

View File

@ -0,0 +1,4 @@
[http]
auth_origin =
auth_realm = "ge"
hoba_cipher_id = 00

View File

@ -0,0 +1,6 @@
[pgp]
trusted_publickey_fingerprint =
import_dir = /var/lib/cic-auth-helper/pgp
publickey_filename = publickeys.asc
signature_filename = signature.asc
run_dir =

View File

@ -0,0 +1,133 @@
# standard imports
import logging
import os
import datetime
import base64
import sys
import argparse
# external imports
from http_hoba_auth import hoba_auth_request_string
from http_token_auth import SessionStore
import confini
# local imports
from usumbufu.challenge import Challenger
#from usumbufu.filter.sha256 import SHA256Filter
from usumbufu.filter.hoba import HobaFilter
from usumbufu.filter.fetcher import FetcherFilter
from usumbufu.filter.pgp import PGPFilter
from usumbufu.filter.session import SessionFilter
from usumbufu.filter import Filter
from usumbufu.retrieve import Retriever
from usumbufu.retrieve.file import FileFetcher
from usumbufu.adapters.uwsgi import UWSGIHTTPAuthorization
from usumbufu.adapters.uwsgi import UWSGIAdapter
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, '..', 'data')
config_dir = os.path.join(data_dir, 'config')
argparser = argparse.ArgumentParser('Authentication helper for ingress routers')
argparser.add_argument('-c', type=str, help='configuration override directory')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args(sys.argv[1:])
if args.vv:
logg.setLevel(logging.DEBUG)
elif args.v:
logg.setLevel(logging.INFO)
config = confini.Config(config_dir, override_dirs=args.c)
config.process()
logg.debug('config loaded:\n' + str(config))
# The Retriever is at the heart of the setup.
# It will run all the decoding steps from all the filters, ultimately resolving to a auth resource (typically ACL) and an identity
challenger = Challenger()
# parse a hoba auth string to key, signature, nonce etc, to the "to be signed" format
hoba_filter = HobaFilter(config.get('HTTP_AUTH_ORIGIN'), config.get('HTTP_AUTH_REALM'), challenger, alg=config.get('HTTP_HOBA_CIPHER_ID'))
trusted_publickeys = config.get('PGP_TRUSTED_PUBLICKEY_FINGERPRINT').split(',')
fetcher_pgp_trusted = FileFetcher(config.get('PGP_IMPORT_DIR'))
pgp_filter = PGPFilter(trusted_publickeys, fetcher_pgp_trusted)
pgp_filter.import_keys(config.get('PGP_PUBLICKEY_FILENAME'), config.get('PGP_SIGNATURE_FILENAME'))
# set a session token for the identity, if not yet set
# the session store can be used outside the pipeline to get session info (like the auth token for a Token header response)
session_store = SessionStore(auth_expire_delta=1)
session_filter = SessionFilter(session_store)
# the decoders will be run in SEQUENCE. let's wire them up, like so:
hoba_retriever = Retriever()
hoba_retriever.add_decoder(hoba_filter)
hoba_retriever.add_decoder(pgp_filter)
hoba_retriever.add_decoder(session_filter)
bearer_retriever = Retriever()
bearer_retriever.add_decoder(session_filter)
# Below here is the runtime code for the UWSGI application
# Most important to notice here is that the ChallengeRetriever is being passed to the UWSGIHTTPAuthorization object. This object can identify a HOBA request, and will attempt to validate the HOBA auth string using ChallengeRetriever.
# cic_eth.Auth.check() (overloaded) will attempt to FETCH the ACL using the key (if any) resulting from the validation
def do_auth(env):
authenticator = UWSGIAdapter()
http_authenticator = UWSGIHTTPAuthorization(hoba_retriever, env, config.get('HTTP_AUTH_REALM'), origin=config.get('HTTP_AUTH_ORIGIN'))
bearer_authenticator = UWSGIHTTPAuthorization(bearer_retriever, env, config.get('HTTP_AUTH_REALM'), origin=config.get('HTTP_AUTH_ORIGIN'))
http_authenticator.component_id = 'http-hoba'
bearer_authenticator.component_id = 'http-bearer'
try:
authenticator.register(bearer_authenticator)
authenticator.activate(bearer_authenticator.component_id)
except TypeError as e:
logg.debug('not a http bearer request: {}'.format(e))
try:
authenticator.register(http_authenticator)
authenticator.activate(http_authenticator.component_id)
except TypeError as e:
logg.debug('not a http hoba request: {}'.format(e))
return authenticator.check()
# And to conclude, vanilla UWSGI stuff
def application(env, start_response):
headers = []
print('env {}'.format(env))
result = do_auth(env)
if result == None:
#if env.get('HTTP_AUTHORIZATION') != None:
# start_response('403 failed miserably', headers)
# return [b'']
(challenge, expire) = challenger.request(env['REMOTE_ADDR'])
headers.append(('WWW-Authenticate', hoba_auth_request_string(challenge, expire.timestamp(), realm=config.get('HTTP_AUTH_REALM'))),)
start_response('401 authenticate or I will SCREAM_SNAKE_CASE at you', headers)
return [b'']
# name the successful auth result parts
auth_method_component_id = result[0]
auth_string = result[1]
auth_resource = result[2]
auth_identity = result[3]
logg.debug('result {}'.format(result))
session = session_store.get(auth_identity)
logg.debug(f'session token: {session.auth}')
if auth_method_component_id == 'http-hoba':
headers.append(('Token', base64.b64encode(session.auth).decode('utf-8'),))
elif session.auth != auth_string:
headers.append(('Token', base64.b64encode(session.auth).decode('utf-8'),))
start_response('200 OK', headers)
return [str(auth_resource).encode('utf-8')]

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
usumbufu==0.3.1a5
uWSGI==2.0.19.1
confini==0.4.2