cic-internal-integration/apps/cic-eth/cic_eth/server/openapi/UWSGIOpenAPIRequest.py

58 lines
1.8 KiB
Python
Raw Normal View History

2021-11-18 13:38:20 +01:00
"""OpenAPI core contrib requests requests module"""
2021-12-02 13:07:49 +01:00
from urllib.parse import parse_qs, urlparse
2021-11-18 13:38:20 +01:00
2021-12-02 13:07:49 +01:00
from openapi_core.validation.request.datatypes import (OpenAPIRequest,
RequestParameters)
2021-12-08 12:31:49 +01:00
from werkzeug import Request
2021-12-02 13:07:49 +01:00
from werkzeug.datastructures import Headers, ImmutableMultiDict
2021-12-08 12:31:49 +01:00
2021-11-18 13:38:20 +01:00
2021-12-02 13:07:49 +01:00
class OpenAPIRequestFactory:
2021-11-18 13:38:20 +01:00
@classmethod
2021-11-19 14:50:13 +01:00
def create(cls, env):
2021-11-18 13:38:20 +01:00
"""
2021-12-08 12:31:49 +01:00
Converts the uwsgi request environment to a request that can be validated against an OpenAPI schema
2021-11-18 13:38:20 +01:00
"""
2021-11-19 14:50:13 +01:00
request = Request(env)
2021-11-18 13:38:20 +01:00
# Method
method = request.method.lower()
# Preparing a request formats the URL with params, strip them out again
o = urlparse(request.url)
params = parse_qs(o.query)
# extract the URL without query parameters
url = o._replace(query=None).geturl()
# Order matters because all python requests issued from a session
# include Accept */* which does not necessarily match the content type
mimetype = request.headers.get("Content-Type") or request.headers.get(
"Accept"
)
# Headers - request.headers is not an instance of Headers
# which is expected
header = Headers(dict(request.headers))
# Body
body = request.get_data()
# Path gets deduced by path finder against spec
parameters = RequestParameters(
query=ImmutableMultiDict(params),
header=header,
cookie=request.cookies,
)
return OpenAPIRequest(
full_url_pattern=url,
method=method,
parameters=parameters,
body=body,
mimetype=mimetype,
)
2021-11-22 10:28:09 +01:00
2021-12-02 13:07:49 +01:00
UWSGIOpenAPIRequest = OpenAPIRequestFactory.create