Skip to content

Commit 5adc9a9

Browse files
committed
wsgisupp: implement support code for the testing of WSGI handlers/servers
1 parent a146e1b commit 5adc9a9

File tree

1 file changed

+155
-0
lines changed

1 file changed

+155
-0
lines changed

tests/support/wsgisupp.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# --- BEGIN_HEADER ---
4+
#
5+
# htmlsupp - test support library for WSGI
6+
# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH
7+
#
8+
# This file is part of MiG.
9+
#
10+
# MiG is free software: you can redistribute it and/or modify
11+
# it under the terms of the GNU General Public License as published by
12+
# the Free Software Foundation; either version 2 of the License, or
13+
# (at your option) any later version.
14+
#
15+
# MiG is distributed in the hope that it will be useful,
16+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
# GNU General Public License for more details.
19+
#
20+
# You should have received a copy of the GNU General Public License
21+
# along with this program; if not, write to the Free Software
22+
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
23+
#
24+
# -- END_HEADER ---
25+
#
26+
27+
from collections import namedtuple
28+
import codecs
29+
from io import BytesIO
30+
from werkzeug.datastructures import MultiDict
31+
32+
from tests.support._env import PY2
33+
34+
if PY2:
35+
from urlparse import urlencode, urlparse
36+
else:
37+
from urllib.parse import urlencode, urlparse
38+
39+
"""Test support library for WSGI."""
40+
41+
42+
_PreparedWsgi = namedtuple('_PreparedWsgi', ['environ', 'start_response'])
43+
44+
45+
class FakeWsgiStartResponse:
46+
"""Glue object that conforms to the same interface as the start_response()
47+
in the WSGI specs but records the calls to it such that they can be
48+
inspected and, for our purposes, asserted against."""
49+
50+
def __init__(self):
51+
self.calls = []
52+
53+
def __call__(self, status, headers, exc=None):
54+
self.calls.append((status, headers, exc))
55+
56+
57+
def create_wsgi_environ(configuration, wsgi_url, method='GET', query=None, headers=None, form=None):
58+
parsed_url = urlparse(wsgi_url)
59+
60+
if query:
61+
method = 'GET'
62+
63+
request_query = urlencode(query)
64+
wsgi_input = ()
65+
elif form:
66+
method = 'POST'
67+
request_query = ''
68+
69+
body = urlencode(MultiDict(form)).encode('ascii')
70+
71+
headers = headers or {}
72+
if not 'Content-Type' in headers:
73+
headers['Content-Type'] = 'application/x-www-form-urlencoded'
74+
75+
headers['Content-Length'] = str(len(body))
76+
wsgi_input = BytesIO(body)
77+
else:
78+
request_query = parsed_url.query
79+
wsgi_input = ()
80+
81+
environ = {}
82+
environ['wsgi.input'] = wsgi_input
83+
environ['wsgi.url_scheme'] = parsed_url.scheme
84+
environ['wsgi.version'] = (1, 0)
85+
environ['MIG_CONF'] = configuration.config_file
86+
environ['HTTP_HOST'] = parsed_url.netloc
87+
environ['PATH_INFO'] = parsed_url.path
88+
environ['QUERY_STRING'] = request_query
89+
environ['REQUEST_METHOD'] = method
90+
environ['SCRIPT_URI'] = ''.join(('http://', environ['HTTP_HOST'], environ['PATH_INFO']))
91+
92+
if headers:
93+
for k, v in headers.items():
94+
header_key = k.replace('-', '_').upper()
95+
if header_key.startswith('CONTENT'):
96+
# Content-* headers must not be prefixed in WSGI
97+
pass
98+
else:
99+
header_key = "HTTP_%s" % (header_key),
100+
environ[header_key] = v
101+
102+
return environ
103+
104+
105+
def create_wsgi_start_response():
106+
return FakeWsgiStartResponse()
107+
108+
109+
def prepare_wsgi(configuration, url, **kwargs):
110+
return _PreparedWsgi(
111+
create_wsgi_environ(configuration, url, **kwargs),
112+
create_wsgi_start_response()
113+
)
114+
115+
116+
def _trigger_and_unpack_result(wsgi_result):
117+
chunks = list(wsgi_result)
118+
assert len(chunks) > 0, "invocation returned no output"
119+
complete_value = b''.join(chunks)
120+
decoded_value = codecs.decode(complete_value, 'utf8')
121+
return decoded_value
122+
123+
124+
class WsgiAssertMixin:
125+
"""Custom assertions for verifying server code executed under test."""
126+
127+
def assertWsgiResponse(self, wsgi_result, fake_wsgi, expected_status_code):
128+
assert isinstance(fake_wsgi, _PreparedWsgi)
129+
130+
content = _trigger_and_unpack_result(wsgi_result)
131+
132+
def called_once(fake):
133+
assert hasattr(fake, 'calls')
134+
return len(fake.calls) == 1
135+
136+
fake_start_response = fake_wsgi.start_response
137+
138+
try:
139+
self.assertTrue(called_once(fake_start_response))
140+
except AssertionError:
141+
if len(fake.calls) == 0:
142+
raise AssertionError("WSGI handler did not respond")
143+
else:
144+
raise AssertionError("WSGI handler responded more than once")
145+
146+
wsgi_call = fake_start_response.calls[0]
147+
148+
# check for expected HTTP status code
149+
wsgi_status = wsgi_call[0]
150+
actual_status_code = int(wsgi_status[0:3])
151+
self.assertEqual(actual_status_code, expected_status_code)
152+
153+
headers = dict(wsgi_call[1])
154+
155+
return content, headers

0 commit comments

Comments
 (0)