diff --git a/local-requirements.txt b/local-requirements.txt index da83dd4b7..7faf3b026 100644 --- a/local-requirements.txt +++ b/local-requirements.txt @@ -7,3 +7,4 @@ autopep8;python_version >= "3" # NOTE: paramiko-3.0.0 dropped python2 and python3.6 support paramiko;python_version >= "3.7" paramiko<3;python_version < "3.7" +werkzeug diff --git a/mig/shared/compat.py b/mig/shared/compat.py index ac5ab0f75..01069ce43 100644 --- a/mig/shared/compat.py +++ b/mig/shared/compat.py @@ -46,6 +46,19 @@ _TYPE_UNICODE = type(u"") +if PY2: + class SimpleNamespace(dict): + """Bare minimum SimpleNamespace for Python 2.""" + + def __getattribute__(self, name): + if name == '__dict__': + return dict(**self) + + return self[name] +else: + from types import SimpleNamespace + + def _is_unicode(val): """Return boolean indicating if the value is a unicode string. diff --git a/mig/wsgi-bin/migwsgi.py b/mig/wsgi-bin/migwsgi.py index f236553ae..68772ef2b 100755 --- a/mig/wsgi-bin/migwsgi.py +++ b/mig/wsgi-bin/migwsgi.py @@ -53,7 +53,7 @@ def object_type_info(object_type): def stub(configuration, client_id, import_path, backend, user_arguments_dict, - environ): + environ, _import_module=importlib.import_module): """Run backend on behalf of client_id with supplied user_arguments_dict. I.e. import main from import_path and execute it with supplied arguments. """ @@ -91,7 +91,7 @@ def stub(configuration, client_id, import_path, backend, user_arguments_dict, # _logger.debug("import main from %r" % import_path) # NOTE: dynamic module loading to find corresponding main function - module_handle = importlib.import_module(import_path) + module_handle = _import_module(import_path) main = module_handle.main except Exception as err: _logger.error("%s could not import %r (%s): %s" % @@ -187,7 +187,8 @@ def _flush_wsgi_errors(): environ['wsgi.errors'].close() -def application(environ, start_response): +def application(environ, start_response, configuration=None, + _import_module=importlib.import_module, _set_os_environ=True): """MiG app called automatically by WSGI. *environ* is a dictionary populated by the server with CGI-like variables @@ -238,7 +239,8 @@ def application(environ, start_response): os_env_value)) # Assign updated environ to LOCAL os.environ for the rest of this session - os.environ = environ + if _set_os_environ: + os.environ(environ) # NOTE: enable to debug runtime environment to apache error log # print("DEBUG: python %s" % @@ -250,7 +252,9 @@ def application(environ, start_response): if sys.version_info[0] < 3: sys.stdout = sys.stderr - configuration = get_configuration_object() + if configuration is None: + configuration = get_configuration_object() + _logger = configuration.logger # NOTE: replace default wsgi errors to apache error log with our own logs @@ -321,7 +325,8 @@ def application(environ, start_response): # _logger.debug("wsgi handling script: %s" % script_name) (output_objs, ret_val) = stub(configuration, client_id, module_path, backend, - user_arguments_dict, environ) + user_arguments_dict, environ, + _import_module=_import_module) else: _logger.warning("wsgi handling refused script:%s" % script_name) (output_objs, ret_val) = reject_main(client_id, diff --git a/tests/support/wsgisupp.py b/tests/support/wsgisupp.py new file mode 100644 index 000000000..0c2a1a974 --- /dev/null +++ b/tests/support/wsgisupp.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# wsgisupp - test support library for WSGI +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + +"""Test support library for WSGI.""" + +from collections import namedtuple +import codecs +from io import BytesIO +from werkzeug.datastructures import MultiDict + +from tests.support._env import PY2 + +if PY2: + from urllib import urlencode + from urlparse import urlparse +else: + from urllib.parse import urlencode, urlparse + + +# named type representing the tuple that is passed to WSGI handlers +_PreparedWsgi = namedtuple('_PreparedWsgi', ['environ', 'start_response']) + + +class FakeWsgiStartResponse: + """Glue object that conforms to the same interface as the start_response() + in the WSGI specs but records the calls to it such that they can be + inspected and, for our purposes, asserted against.""" + + def __init__(self): + self.calls = [] + + def __call__(self, status, headers, exc=None): + self.calls.append((status, headers, exc)) + + +def create_wsgi_environ(configuration, wsgi_url, method='GET', query=None, headers=None, form=None): + """Populate the necessary variables that will constitute a valid WSGI + environment given a URL to which we will make a requests under test and + various other options that set up the nature of that request.""" + + parsed_url = urlparse(wsgi_url) + + if query: + method = 'GET' + + request_query = urlencode(query) + wsgi_input = () + elif form: + method = 'POST' + request_query = '' + + body = urlencode(MultiDict(form)).encode('ascii') + + headers = headers or {} + if not 'Content-Type' in headers: + headers['Content-Type'] = 'application/x-www-form-urlencoded' + + headers['Content-Length'] = str(len(body)) + wsgi_input = BytesIO(body) + else: + request_query = parsed_url.query + wsgi_input = () + + class _errors: + def close(): + pass + + environ = {} + environ['wsgi.errors'] = _errors() + environ['wsgi.input'] = wsgi_input + environ['wsgi.url_scheme'] = parsed_url.scheme + environ['wsgi.version'] = (1, 0) + environ['MIG_CONF'] = configuration.config_file + environ['HTTP_HOST'] = parsed_url.netloc + environ['PATH_INFO'] = parsed_url.path + environ['QUERY_STRING'] = request_query + environ['REQUEST_METHOD'] = method + environ['SCRIPT_URI'] = ''.join( + ('http://', environ['HTTP_HOST'], environ['PATH_INFO'])) + + if headers: + for k, v in headers.items(): + header_key = k.replace('-', '_').upper() + if header_key.startswith('CONTENT'): + # Content-* headers must not be prefixed in WSGI + pass + else: + header_key = "HTTP_%s" % (header_key), + environ[header_key] = v + + return environ + + +def create_wsgi_start_response(): + return FakeWsgiStartResponse() + + +def prepare_wsgi(configuration, url, **kwargs): + return _PreparedWsgi( + create_wsgi_environ(configuration, url, **kwargs), + create_wsgi_start_response() + ) + + +def _trigger_and_unpack_result(wsgi_result): + chunks = list(wsgi_result) + assert len(chunks) > 0, "invocation returned no output" + complete_value = b''.join(chunks) + decoded_value = codecs.decode(complete_value, 'utf8') + return decoded_value + + +class WsgiAssertMixin: + """Custom assertions for verifying server code executed under test.""" + + def assertWsgiResponse(self, wsgi_result, fake_wsgi, expected_status_code): + assert isinstance(fake_wsgi, _PreparedWsgi) + + content = _trigger_and_unpack_result(wsgi_result) + + def called_once(fake): + assert hasattr(fake, 'calls') + return len(fake.calls) == 1 + + fake_start_response = fake_wsgi.start_response + + try: + self.assertTrue(called_once(fake_start_response)) + except AssertionError: + if len(fake.calls) == 0: + raise AssertionError("WSGI handler did not respond") + else: + raise AssertionError("WSGI handler responded more than once") + + wsgi_call = fake_start_response.calls[0] + + # check for expected HTTP status code + wsgi_status = wsgi_call[0] + actual_status_code = int(wsgi_status[0:3]) + self.assertEqual(actual_status_code, expected_status_code) + + headers = dict(wsgi_call[1]) + + return content, headers diff --git a/tests/test_mig_wsgi-bin.py b/tests/test_mig_wsgi-bin.py new file mode 100644 index 000000000..7447ac838 --- /dev/null +++ b/tests/test_mig_wsgi-bin.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# test_mig_wsgi-bin - unit tests of the WSGI glue +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# +# --- END_HEADER --- +# + +"""Unit tests for the MiG WSGI glue""" + +import codecs +from configparser import ConfigParser +import importlib +import os +import stat +import sys + +from tests.support import PY2, MIG_BASE, MigTestCase, testmain, is_path_within +from tests.support.wsgisupp import prepare_wsgi, WsgiAssertMixin + +from mig.shared.base import client_id_dir, client_dir_id, get_short_id, \ + invisible_path, allow_script, brief_list +from mig.shared.compat import SimpleNamespace +import mig.shared.returnvalues as returnvalues + +if PY2: + from HTMLParser import HTMLParser +else: + from html.parser import HTMLParser + + +class TitleExtractingHtmlParser(HTMLParser): + """An HTML parser using builtin machinery which will extract the title.""" + + def __init__(self): + HTMLParser.__init__(self) + self._title = None + self._within_title = None + + def handle_data(self, *args, **kwargs): + if self._within_title: + self._title = args[0] + + def handle_starttag(self, tag, attrs): + if tag == 'title': + self._within_title = True + + def handle_endtag(self, tag): + if tag == 'title': + self._within_title = False + + def title(self, trim_newlines=False): + if self._title and not self._within_title: + if trim_newlines: + return self._title.strip() + else: + return self._title + elif self._within_title: + raise AssertionError(None, "title end tag missing") + else: + raise AssertionError(None, "title was not encountered") + + +def _import_forcibly(module_name, relative_module_dir=None): + """Custom import function to allow an import of a file for testing + that resides within a non-module directory.""" + + module_path = os.path.join(MIG_BASE, 'mig') + if relative_module_dir is not None: + module_path = os.path.join(module_path, relative_module_dir) + sys.path.append(module_path) + mod = importlib.import_module(module_name) + sys.path.pop(-1) # do not leave the forced module path + return mod + + +migwsgi = _import_forcibly('migwsgi', relative_module_dir='wsgi-bin') + + +class FakeBackend: + """Object with programmable behaviour that behave like a backend and + captures details about the calls made to it. It allows the tests to + assert against known outcomes as well as selectively trigger a wider + range of codepaths.""" + + def __init__(self): + self.output_objects = [ + {'object_type': 'start'}, + {'object_type': 'title', 'text': 'ERROR'}, + ] + self.return_value = returnvalues.ERROR + + def main(self, client_id, user_arguments_dict): + return self.output_objects, self.return_value + + def set_response(self, output_objects, returnvalue): + self.output_objects = output_objects + self.return_value = returnvalue + + def to_import_module(self): + def _import_module(module_path): + return self + return _import_module + + +class MigWsgibin(MigTestCase, WsgiAssertMixin): + """WSGI glue test cases""" + + def _provide_configuration(self): + return 'testconfig' + + def before_each(self): + self.fake_backend = FakeBackend() + self.fake_wsgi = prepare_wsgi(self.configuration, 'http://localhost/') + + self.application_args = ( + self.fake_wsgi.environ, + self.fake_wsgi.start_response, + ) + self.application_kwargs = dict( + configuration=self.configuration, + _import_module=self.fake_backend.to_import_module(), + _set_os_environ=False, + ) + + def assertHtmlTitle(self, value, title_text=None, trim_newlines=False): + assert title_text is not None + + parser = TitleExtractingHtmlParser() + parser.feed(value) + actual_title = parser.title(trim_newlines=trim_newlines) + self.assertEqual(actual_title, title_text) + + def test_return_value_ok_returns_status_200(self): + wsgi_result = migwsgi.application( + *self.application_args, + **self.application_kwargs + ) + + self.assertWsgiResponse(wsgi_result, self.fake_wsgi, 200) + + def test_return_value_ok_returns_expected_title(self): + output_objects = [ + {'object_type': 'title', 'text': 'TEST'} + ] + self.fake_backend.set_response(output_objects, returnvalues.OK) + + wsgi_result = migwsgi.application( + *self.application_args, + **self.application_kwargs + ) + + output, _ = self.assertWsgiResponse(wsgi_result, self.fake_wsgi, 200) + self.assertHtmlTitle(output, title_text='TEST', trim_newlines=True) + + +if __name__ == '__main__': + testmain()