Skip to content

Commit 1638670

Browse files
committed
wsgisupp: implement support code for the testing of WSGI handlers/servers
1 parent ad7fbdf commit 1638670

File tree

2 files changed

+157
-0
lines changed

2 files changed

+157
-0
lines changed

local-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ autopep8;python_version >= "3"
77
# NOTE: paramiko-3.0.0 dropped python2 and python3.6 support
88
paramiko;python_version >= "3.7"
99
paramiko<3;python_version < "3.7"
10+
werkzeug

tests/support/wsgisupp.py

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# --- BEGIN_HEADER ---
4+
#
5+
# htmlsupp - test support library for WSGI
6+
# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH
7+
#
8+
# This file is part of MiG.
9+
#
10+
# MiG is free software: you can redistribute it and/or modify
11+
# it under the terms of the GNU General Public License as published by
12+
# the Free Software Foundation; either version 2 of the License, or
13+
# (at your option) any later version.
14+
#
15+
# MiG is distributed in the hope that it will be useful,
16+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
# GNU General Public License for more details.
19+
#
20+
# You should have received a copy of the GNU General Public License
21+
# along with this program; if not, write to the Free Software
22+
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
23+
#
24+
# -- END_HEADER ---
25+
#
26+
27+
from collections import namedtuple
28+
import codecs
29+
from io import BytesIO
30+
from werkzeug.datastructures import MultiDict
31+
32+
from tests.support._env import PY2
33+
34+
if PY2:
35+
from urllib import urlencode
36+
from urlparse import urlparse
37+
else:
38+
from urllib.parse import urlencode, urlparse
39+
40+
"""Test support library for WSGI."""
41+
42+
43+
_PreparedWsgi = namedtuple('_PreparedWsgi', ['environ', 'start_response'])
44+
45+
46+
class FakeWsgiStartResponse:
47+
"""Glue object that conforms to the same interface as the start_response()
48+
in the WSGI specs but records the calls to it such that they can be
49+
inspected and, for our purposes, asserted against."""
50+
51+
def __init__(self):
52+
self.calls = []
53+
54+
def __call__(self, status, headers, exc=None):
55+
self.calls.append((status, headers, exc))
56+
57+
58+
def create_wsgi_environ(configuration, wsgi_url, method='GET', query=None, headers=None, form=None):
59+
parsed_url = urlparse(wsgi_url)
60+
61+
if query:
62+
method = 'GET'
63+
64+
request_query = urlencode(query)
65+
wsgi_input = ()
66+
elif form:
67+
method = 'POST'
68+
request_query = ''
69+
70+
body = urlencode(MultiDict(form)).encode('ascii')
71+
72+
headers = headers or {}
73+
if not 'Content-Type' in headers:
74+
headers['Content-Type'] = 'application/x-www-form-urlencoded'
75+
76+
headers['Content-Length'] = str(len(body))
77+
wsgi_input = BytesIO(body)
78+
else:
79+
request_query = parsed_url.query
80+
wsgi_input = ()
81+
82+
environ = {}
83+
environ['wsgi.input'] = wsgi_input
84+
environ['wsgi.url_scheme'] = parsed_url.scheme
85+
environ['wsgi.version'] = (1, 0)
86+
environ['MIG_CONF'] = configuration.config_file
87+
environ['HTTP_HOST'] = parsed_url.netloc
88+
environ['PATH_INFO'] = parsed_url.path
89+
environ['QUERY_STRING'] = request_query
90+
environ['REQUEST_METHOD'] = method
91+
environ['SCRIPT_URI'] = ''.join(('http://', environ['HTTP_HOST'], environ['PATH_INFO']))
92+
93+
if headers:
94+
for k, v in headers.items():
95+
header_key = k.replace('-', '_').upper()
96+
if header_key.startswith('CONTENT'):
97+
# Content-* headers must not be prefixed in WSGI
98+
pass
99+
else:
100+
header_key = "HTTP_%s" % (header_key),
101+
environ[header_key] = v
102+
103+
return environ
104+
105+
106+
def create_wsgi_start_response():
107+
return FakeWsgiStartResponse()
108+
109+
110+
def prepare_wsgi(configuration, url, **kwargs):
111+
return _PreparedWsgi(
112+
create_wsgi_environ(configuration, url, **kwargs),
113+
create_wsgi_start_response()
114+
)
115+
116+
117+
def _trigger_and_unpack_result(wsgi_result):
118+
chunks = list(wsgi_result)
119+
assert len(chunks) > 0, "invocation returned no output"
120+
complete_value = b''.join(chunks)
121+
decoded_value = codecs.decode(complete_value, 'utf8')
122+
return decoded_value
123+
124+
125+
class WsgiAssertMixin:
126+
"""Custom assertions for verifying server code executed under test."""
127+
128+
def assertWsgiResponse(self, wsgi_result, fake_wsgi, expected_status_code):
129+
assert isinstance(fake_wsgi, _PreparedWsgi)
130+
131+
content = _trigger_and_unpack_result(wsgi_result)
132+
133+
def called_once(fake):
134+
assert hasattr(fake, 'calls')
135+
return len(fake.calls) == 1
136+
137+
fake_start_response = fake_wsgi.start_response
138+
139+
try:
140+
self.assertTrue(called_once(fake_start_response))
141+
except AssertionError:
142+
if len(fake.calls) == 0:
143+
raise AssertionError("WSGI handler did not respond")
144+
else:
145+
raise AssertionError("WSGI handler responded more than once")
146+
147+
wsgi_call = fake_start_response.calls[0]
148+
149+
# check for expected HTTP status code
150+
wsgi_status = wsgi_call[0]
151+
actual_status_code = int(wsgi_status[0:3])
152+
self.assertEqual(actual_status_code, expected_status_code)
153+
154+
headers = dict(wsgi_call[1])
155+
156+
return content, headers

0 commit comments

Comments
 (0)