From e48b12e9d33359e08cab40291b52ef43afe36fee Mon Sep 17 00:00:00 2001 From: Alex Burke Date: Fri, 26 Jul 2024 16:39:19 +0200 Subject: [PATCH] Basic coverage of migwsgi and addition of WSGI test suppport code. This PR does the principle things required to allow exercising the central component responsible for glueing named MiG "functionality" files to WSGI. Included is a new wsgisupp.py file implementing the support bits to allow exercising WSGI related code in a readable way. --- local-requirements.txt | 1 + mig/shared/compat.py | 13 +++ mig/wsgi-bin/migwsgi.py | 17 ++-- tests/support/wsgisupp.py | 167 ++++++++++++++++++++++++++++++++++ tests/test_mig_wsgi-bin.py | 177 +++++++++++++++++++++++++++++++++++++ 5 files changed, 369 insertions(+), 6 deletions(-) create mode 100644 tests/support/wsgisupp.py create mode 100644 tests/test_mig_wsgi-bin.py diff --git a/local-requirements.txt b/local-requirements.txt index da83dd4b7..7faf3b026 100644 --- a/local-requirements.txt +++ b/local-requirements.txt @@ -7,3 +7,4 @@ autopep8;python_version >= "3" # NOTE: paramiko-3.0.0 dropped python2 and python3.6 support paramiko;python_version >= "3.7" paramiko<3;python_version < "3.7" +werkzeug diff --git a/mig/shared/compat.py b/mig/shared/compat.py index ac5ab0f75..01069ce43 100644 --- a/mig/shared/compat.py +++ b/mig/shared/compat.py @@ -46,6 +46,19 @@ _TYPE_UNICODE = type(u"") +if PY2: + class SimpleNamespace(dict): + """Bare minimum SimpleNamespace for Python 2.""" + + def __getattribute__(self, name): + if name == '__dict__': + return dict(**self) + + return self[name] +else: + from types import SimpleNamespace + + def _is_unicode(val): """Return boolean indicating if the value is a unicode string. diff --git a/mig/wsgi-bin/migwsgi.py b/mig/wsgi-bin/migwsgi.py index f236553ae..68772ef2b 100755 --- a/mig/wsgi-bin/migwsgi.py +++ b/mig/wsgi-bin/migwsgi.py @@ -53,7 +53,7 @@ def object_type_info(object_type): def stub(configuration, client_id, import_path, backend, user_arguments_dict, - environ): + environ, _import_module=importlib.import_module): """Run backend on behalf of client_id with supplied user_arguments_dict. I.e. import main from import_path and execute it with supplied arguments. """ @@ -91,7 +91,7 @@ def stub(configuration, client_id, import_path, backend, user_arguments_dict, # _logger.debug("import main from %r" % import_path) # NOTE: dynamic module loading to find corresponding main function - module_handle = importlib.import_module(import_path) + module_handle = _import_module(import_path) main = module_handle.main except Exception as err: _logger.error("%s could not import %r (%s): %s" % @@ -187,7 +187,8 @@ def _flush_wsgi_errors(): environ['wsgi.errors'].close() -def application(environ, start_response): +def application(environ, start_response, configuration=None, + _import_module=importlib.import_module, _set_os_environ=True): """MiG app called automatically by WSGI. *environ* is a dictionary populated by the server with CGI-like variables @@ -238,7 +239,8 @@ def application(environ, start_response): os_env_value)) # Assign updated environ to LOCAL os.environ for the rest of this session - os.environ = environ + if _set_os_environ: + os.environ(environ) # NOTE: enable to debug runtime environment to apache error log # print("DEBUG: python %s" % @@ -250,7 +252,9 @@ def application(environ, start_response): if sys.version_info[0] < 3: sys.stdout = sys.stderr - configuration = get_configuration_object() + if configuration is None: + configuration = get_configuration_object() + _logger = configuration.logger # NOTE: replace default wsgi errors to apache error log with our own logs @@ -321,7 +325,8 @@ def application(environ, start_response): # _logger.debug("wsgi handling script: %s" % script_name) (output_objs, ret_val) = stub(configuration, client_id, module_path, backend, - user_arguments_dict, environ) + user_arguments_dict, environ, + _import_module=_import_module) else: _logger.warning("wsgi handling refused script:%s" % script_name) (output_objs, ret_val) = reject_main(client_id, diff --git a/tests/support/wsgisupp.py b/tests/support/wsgisupp.py new file mode 100644 index 000000000..0c2a1a974 --- /dev/null +++ b/tests/support/wsgisupp.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# wsgisupp - test support library for WSGI +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + +"""Test support library for WSGI.""" + +from collections import namedtuple +import codecs +from io import BytesIO +from werkzeug.datastructures import MultiDict + +from tests.support._env import PY2 + +if PY2: + from urllib import urlencode + from urlparse import urlparse +else: + from urllib.parse import urlencode, urlparse + + +# named type representing the tuple that is passed to WSGI handlers +_PreparedWsgi = namedtuple('_PreparedWsgi', ['environ', 'start_response']) + + +class FakeWsgiStartResponse: + """Glue object that conforms to the same interface as the start_response() + in the WSGI specs but records the calls to it such that they can be + inspected and, for our purposes, asserted against.""" + + def __init__(self): + self.calls = [] + + def __call__(self, status, headers, exc=None): + self.calls.append((status, headers, exc)) + + +def create_wsgi_environ(configuration, wsgi_url, method='GET', query=None, headers=None, form=None): + """Populate the necessary variables that will constitute a valid WSGI + environment given a URL to which we will make a requests under test and + various other options that set up the nature of that request.""" + + parsed_url = urlparse(wsgi_url) + + if query: + method = 'GET' + + request_query = urlencode(query) + wsgi_input = () + elif form: + method = 'POST' + request_query = '' + + body = urlencode(MultiDict(form)).encode('ascii') + + headers = headers or {} + if not 'Content-Type' in headers: + headers['Content-Type'] = 'application/x-www-form-urlencoded' + + headers['Content-Length'] = str(len(body)) + wsgi_input = BytesIO(body) + else: + request_query = parsed_url.query + wsgi_input = () + + class _errors: + def close(): + pass + + environ = {} + environ['wsgi.errors'] = _errors() + environ['wsgi.input'] = wsgi_input + environ['wsgi.url_scheme'] = parsed_url.scheme + environ['wsgi.version'] = (1, 0) + environ['MIG_CONF'] = configuration.config_file + environ['HTTP_HOST'] = parsed_url.netloc + environ['PATH_INFO'] = parsed_url.path + environ['QUERY_STRING'] = request_query + environ['REQUEST_METHOD'] = method + environ['SCRIPT_URI'] = ''.join( + ('http://', environ['HTTP_HOST'], environ['PATH_INFO'])) + + if headers: + for k, v in headers.items(): + header_key = k.replace('-', '_').upper() + if header_key.startswith('CONTENT'): + # Content-* headers must not be prefixed in WSGI + pass + else: + header_key = "HTTP_%s" % (header_key), + environ[header_key] = v + + return environ + + +def create_wsgi_start_response(): + return FakeWsgiStartResponse() + + +def prepare_wsgi(configuration, url, **kwargs): + return _PreparedWsgi( + create_wsgi_environ(configuration, url, **kwargs), + create_wsgi_start_response() + ) + + +def _trigger_and_unpack_result(wsgi_result): + chunks = list(wsgi_result) + assert len(chunks) > 0, "invocation returned no output" + complete_value = b''.join(chunks) + decoded_value = codecs.decode(complete_value, 'utf8') + return decoded_value + + +class WsgiAssertMixin: + """Custom assertions for verifying server code executed under test.""" + + def assertWsgiResponse(self, wsgi_result, fake_wsgi, expected_status_code): + assert isinstance(fake_wsgi, _PreparedWsgi) + + content = _trigger_and_unpack_result(wsgi_result) + + def called_once(fake): + assert hasattr(fake, 'calls') + return len(fake.calls) == 1 + + fake_start_response = fake_wsgi.start_response + + try: + self.assertTrue(called_once(fake_start_response)) + except AssertionError: + if len(fake.calls) == 0: + raise AssertionError("WSGI handler did not respond") + else: + raise AssertionError("WSGI handler responded more than once") + + wsgi_call = fake_start_response.calls[0] + + # check for expected HTTP status code + wsgi_status = wsgi_call[0] + actual_status_code = int(wsgi_status[0:3]) + self.assertEqual(actual_status_code, expected_status_code) + + headers = dict(wsgi_call[1]) + + return content, headers diff --git a/tests/test_mig_wsgi-bin.py b/tests/test_mig_wsgi-bin.py new file mode 100644 index 000000000..7447ac838 --- /dev/null +++ b/tests/test_mig_wsgi-bin.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# test_mig_wsgi-bin - unit tests of the WSGI glue +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# +# --- END_HEADER --- +# + +"""Unit tests for the MiG WSGI glue""" + +import codecs +from configparser import ConfigParser +import importlib +import os +import stat +import sys + +from tests.support import PY2, MIG_BASE, MigTestCase, testmain, is_path_within +from tests.support.wsgisupp import prepare_wsgi, WsgiAssertMixin + +from mig.shared.base import client_id_dir, client_dir_id, get_short_id, \ + invisible_path, allow_script, brief_list +from mig.shared.compat import SimpleNamespace +import mig.shared.returnvalues as returnvalues + +if PY2: + from HTMLParser import HTMLParser +else: + from html.parser import HTMLParser + + +class TitleExtractingHtmlParser(HTMLParser): + """An HTML parser using builtin machinery which will extract the title.""" + + def __init__(self): + HTMLParser.__init__(self) + self._title = None + self._within_title = None + + def handle_data(self, *args, **kwargs): + if self._within_title: + self._title = args[0] + + def handle_starttag(self, tag, attrs): + if tag == 'title': + self._within_title = True + + def handle_endtag(self, tag): + if tag == 'title': + self._within_title = False + + def title(self, trim_newlines=False): + if self._title and not self._within_title: + if trim_newlines: + return self._title.strip() + else: + return self._title + elif self._within_title: + raise AssertionError(None, "title end tag missing") + else: + raise AssertionError(None, "title was not encountered") + + +def _import_forcibly(module_name, relative_module_dir=None): + """Custom import function to allow an import of a file for testing + that resides within a non-module directory.""" + + module_path = os.path.join(MIG_BASE, 'mig') + if relative_module_dir is not None: + module_path = os.path.join(module_path, relative_module_dir) + sys.path.append(module_path) + mod = importlib.import_module(module_name) + sys.path.pop(-1) # do not leave the forced module path + return mod + + +migwsgi = _import_forcibly('migwsgi', relative_module_dir='wsgi-bin') + + +class FakeBackend: + """Object with programmable behaviour that behave like a backend and + captures details about the calls made to it. It allows the tests to + assert against known outcomes as well as selectively trigger a wider + range of codepaths.""" + + def __init__(self): + self.output_objects = [ + {'object_type': 'start'}, + {'object_type': 'title', 'text': 'ERROR'}, + ] + self.return_value = returnvalues.ERROR + + def main(self, client_id, user_arguments_dict): + return self.output_objects, self.return_value + + def set_response(self, output_objects, returnvalue): + self.output_objects = output_objects + self.return_value = returnvalue + + def to_import_module(self): + def _import_module(module_path): + return self + return _import_module + + +class MigWsgibin(MigTestCase, WsgiAssertMixin): + """WSGI glue test cases""" + + def _provide_configuration(self): + return 'testconfig' + + def before_each(self): + self.fake_backend = FakeBackend() + self.fake_wsgi = prepare_wsgi(self.configuration, 'http://localhost/') + + self.application_args = ( + self.fake_wsgi.environ, + self.fake_wsgi.start_response, + ) + self.application_kwargs = dict( + configuration=self.configuration, + _import_module=self.fake_backend.to_import_module(), + _set_os_environ=False, + ) + + def assertHtmlTitle(self, value, title_text=None, trim_newlines=False): + assert title_text is not None + + parser = TitleExtractingHtmlParser() + parser.feed(value) + actual_title = parser.title(trim_newlines=trim_newlines) + self.assertEqual(actual_title, title_text) + + def test_return_value_ok_returns_status_200(self): + wsgi_result = migwsgi.application( + *self.application_args, + **self.application_kwargs + ) + + self.assertWsgiResponse(wsgi_result, self.fake_wsgi, 200) + + def test_return_value_ok_returns_expected_title(self): + output_objects = [ + {'object_type': 'title', 'text': 'TEST'} + ] + self.fake_backend.set_response(output_objects, returnvalues.OK) + + wsgi_result = migwsgi.application( + *self.application_args, + **self.application_kwargs + ) + + output, _ = self.assertWsgiResponse(wsgi_result, self.fake_wsgi, 200) + self.assertHtmlTitle(output, title_text='TEST', trim_newlines=True) + + +if __name__ == '__main__': + testmain()