Skip to content

Commit 299e789

Browse files
committed
wsgisupp: implement support code for the testing of WSGI handlers/servers
1 parent a17c11a commit 299e789

File tree

2 files changed

+162
-0
lines changed

2 files changed

+162
-0
lines changed

local-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ autopep8;python_version >= "3"
77
# NOTE: paramiko-3.0.0 dropped python2 and python3.6 support
88
paramiko;python_version >= "3.7"
99
paramiko<3;python_version < "3.7"
10+
werkzeug

tests/support/wsgisupp.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# --- BEGIN_HEADER ---
4+
#
5+
# htmlsupp - test support library for WSGI
6+
# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH
7+
#
8+
# This file is part of MiG.
9+
#
10+
# MiG is free software: you can redistribute it and/or modify
11+
# it under the terms of the GNU General Public License as published by
12+
# the Free Software Foundation; either version 2 of the License, or
13+
# (at your option) any later version.
14+
#
15+
# MiG is distributed in the hope that it will be useful,
16+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
# GNU General Public License for more details.
19+
#
20+
# You should have received a copy of the GNU General Public License
21+
# along with this program; if not, write to the Free Software
22+
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
23+
#
24+
# -- END_HEADER ---
25+
#
26+
27+
from collections import namedtuple
28+
import codecs
29+
from io import BytesIO
30+
from werkzeug.datastructures import MultiDict
31+
32+
from tests.support._env import PY2
33+
34+
if PY2:
35+
from urllib import urlencode
36+
from urlparse import urlparse
37+
else:
38+
from urllib.parse import urlencode, urlparse
39+
40+
"""Test support library for WSGI."""
41+
42+
43+
_PreparedWsgi = namedtuple('_PreparedWsgi', ['environ', 'start_response'])
44+
45+
46+
class FakeWsgiStartResponse:
47+
"""Glue object that conforms to the same interface as the start_response()
48+
in the WSGI specs but records the calls to it such that they can be
49+
inspected and, for our purposes, asserted against."""
50+
51+
def __init__(self):
52+
self.calls = []
53+
54+
def __call__(self, status, headers, exc=None):
55+
self.calls.append((status, headers, exc))
56+
57+
58+
def create_wsgi_environ(configuration, wsgi_url, method='GET', query=None, headers=None, form=None):
59+
parsed_url = urlparse(wsgi_url)
60+
61+
if query:
62+
method = 'GET'
63+
64+
request_query = urlencode(query)
65+
wsgi_input = ()
66+
elif form:
67+
method = 'POST'
68+
request_query = ''
69+
70+
body = urlencode(MultiDict(form)).encode('ascii')
71+
72+
headers = headers or {}
73+
if not 'Content-Type' in headers:
74+
headers['Content-Type'] = 'application/x-www-form-urlencoded'
75+
76+
headers['Content-Length'] = str(len(body))
77+
wsgi_input = BytesIO(body)
78+
else:
79+
request_query = parsed_url.query
80+
wsgi_input = ()
81+
82+
class _errors:
83+
def close():
84+
pass
85+
86+
environ = {}
87+
environ['wsgi.errors'] = _errors()
88+
environ['wsgi.input'] = wsgi_input
89+
environ['wsgi.url_scheme'] = parsed_url.scheme
90+
environ['wsgi.version'] = (1, 0)
91+
environ['MIG_CONF'] = configuration.config_file
92+
environ['HTTP_HOST'] = parsed_url.netloc
93+
environ['PATH_INFO'] = parsed_url.path
94+
environ['QUERY_STRING'] = request_query
95+
environ['REQUEST_METHOD'] = method
96+
environ['SCRIPT_URI'] = ''.join(('http://', environ['HTTP_HOST'], environ['PATH_INFO']))
97+
98+
if headers:
99+
for k, v in headers.items():
100+
header_key = k.replace('-', '_').upper()
101+
if header_key.startswith('CONTENT'):
102+
# Content-* headers must not be prefixed in WSGI
103+
pass
104+
else:
105+
header_key = "HTTP_%s" % (header_key),
106+
environ[header_key] = v
107+
108+
return environ
109+
110+
111+
def create_wsgi_start_response():
112+
return FakeWsgiStartResponse()
113+
114+
115+
def prepare_wsgi(configuration, url, **kwargs):
116+
return _PreparedWsgi(
117+
create_wsgi_environ(configuration, url, **kwargs),
118+
create_wsgi_start_response()
119+
)
120+
121+
122+
def _trigger_and_unpack_result(wsgi_result):
123+
chunks = list(wsgi_result)
124+
assert len(chunks) > 0, "invocation returned no output"
125+
complete_value = b''.join(chunks)
126+
decoded_value = codecs.decode(complete_value, 'utf8')
127+
return decoded_value
128+
129+
130+
class WsgiAssertMixin:
131+
"""Custom assertions for verifying server code executed under test."""
132+
133+
def assertWsgiResponse(self, wsgi_result, fake_wsgi, expected_status_code):
134+
assert isinstance(fake_wsgi, _PreparedWsgi)
135+
136+
content = _trigger_and_unpack_result(wsgi_result)
137+
138+
def called_once(fake):
139+
assert hasattr(fake, 'calls')
140+
return len(fake.calls) == 1
141+
142+
fake_start_response = fake_wsgi.start_response
143+
144+
try:
145+
self.assertTrue(called_once(fake_start_response))
146+
except AssertionError:
147+
if len(fake.calls) == 0:
148+
raise AssertionError("WSGI handler did not respond")
149+
else:
150+
raise AssertionError("WSGI handler responded more than once")
151+
152+
wsgi_call = fake_start_response.calls[0]
153+
154+
# check for expected HTTP status code
155+
wsgi_status = wsgi_call[0]
156+
actual_status_code = int(wsgi_status[0:3])
157+
self.assertEqual(actual_status_code, expected_status_code)
158+
159+
headers = dict(wsgi_call[1])
160+
161+
return content, headers

0 commit comments

Comments
 (0)