Skip to content

Basic coverage of migwsgi. #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions local-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ autopep8;python_version >= "3"
# NOTE: paramiko-3.0.0 dropped python2 and python3.6 support
paramiko;python_version >= "3.7"
paramiko<3;python_version < "3.7"
werkzeug
13 changes: 13 additions & 0 deletions mig/shared/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@
_TYPE_UNICODE = type(u"")


if PY2:
class SimpleNamespace(dict):
"""Bare minimum SimpleNamespace for Python 2."""

def __getattribute__(self, name):
if name == '__dict__':
return dict(**self)

return self[name]
else:
from types import SimpleNamespace


def _is_unicode(val):
"""Return boolean indicating if the value is a unicode string.

Expand Down
17 changes: 11 additions & 6 deletions mig/wsgi-bin/migwsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def object_type_info(object_type):


def stub(configuration, client_id, import_path, backend, user_arguments_dict,
environ):
environ, _import_module=importlib.import_module):
"""Run backend on behalf of client_id with supplied user_arguments_dict.
I.e. import main from import_path and execute it with supplied arguments.
"""
Expand Down Expand Up @@ -91,7 +91,7 @@ def stub(configuration, client_id, import_path, backend, user_arguments_dict,

# _logger.debug("import main from %r" % import_path)
# NOTE: dynamic module loading to find corresponding main function
module_handle = importlib.import_module(import_path)
module_handle = _import_module(import_path)
main = module_handle.main
except Exception as err:
_logger.error("%s could not import %r (%s): %s" %
Expand Down Expand Up @@ -187,7 +187,8 @@ def _flush_wsgi_errors():
environ['wsgi.errors'].close()


def application(environ, start_response):
def application(environ, start_response, configuration=None,
_import_module=importlib.import_module, _set_os_environ=True):
"""MiG app called automatically by WSGI.

*environ* is a dictionary populated by the server with CGI-like variables
Expand Down Expand Up @@ -238,7 +239,8 @@ def application(environ, start_response):
os_env_value))

# Assign updated environ to LOCAL os.environ for the rest of this session
os.environ = environ
if _set_os_environ:
os.environ(environ)

# NOTE: enable to debug runtime environment to apache error log
# print("DEBUG: python %s" %
Expand All @@ -250,7 +252,9 @@ def application(environ, start_response):
if sys.version_info[0] < 3:
sys.stdout = sys.stderr

configuration = get_configuration_object()
if configuration is None:
configuration = get_configuration_object()

_logger = configuration.logger

# NOTE: replace default wsgi errors to apache error log with our own logs
Expand Down Expand Up @@ -321,7 +325,8 @@ def application(environ, start_response):
# _logger.debug("wsgi handling script: %s" % script_name)
(output_objs, ret_val) = stub(configuration, client_id,
module_path, backend,
user_arguments_dict, environ)
user_arguments_dict, environ,
_import_module=_import_module)
else:
_logger.warning("wsgi handling refused script:%s" % script_name)
(output_objs, ret_val) = reject_main(client_id,
Expand Down
167 changes: 167 additions & 0 deletions tests/support/wsgisupp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# -*- coding: utf-8 -*-
#
# --- BEGIN_HEADER ---
#
# wsgisupp - test support library for WSGI
# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH
#
# This file is part of MiG.
#
# MiG is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# MiG is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# -- END_HEADER ---
#

"""Test support library for WSGI."""

from collections import namedtuple
import codecs
from io import BytesIO
from werkzeug.datastructures import MultiDict

from tests.support._env import PY2

if PY2:
from urllib import urlencode
from urlparse import urlparse
else:
from urllib.parse import urlencode, urlparse


# named type representing the tuple that is passed to WSGI handlers
_PreparedWsgi = namedtuple('_PreparedWsgi', ['environ', 'start_response'])


class FakeWsgiStartResponse:
"""Glue object that conforms to the same interface as the start_response()
in the WSGI specs but records the calls to it such that they can be
inspected and, for our purposes, asserted against."""

def __init__(self):
self.calls = []

def __call__(self, status, headers, exc=None):
self.calls.append((status, headers, exc))


def create_wsgi_environ(configuration, wsgi_url, method='GET', query=None, headers=None, form=None):
"""Populate the necessary variables that will constitute a valid WSGI
environment given a URL to which we will make a requests under test and
various other options that set up the nature of that request."""

parsed_url = urlparse(wsgi_url)

if query:
method = 'GET'

request_query = urlencode(query)
wsgi_input = ()
elif form:
method = 'POST'
request_query = ''

body = urlencode(MultiDict(form)).encode('ascii')

headers = headers or {}
if not 'Content-Type' in headers:
headers['Content-Type'] = 'application/x-www-form-urlencoded'

headers['Content-Length'] = str(len(body))
wsgi_input = BytesIO(body)
else:
request_query = parsed_url.query
wsgi_input = ()

class _errors:
def close():
pass

environ = {}
environ['wsgi.errors'] = _errors()
environ['wsgi.input'] = wsgi_input
environ['wsgi.url_scheme'] = parsed_url.scheme
environ['wsgi.version'] = (1, 0)
environ['MIG_CONF'] = configuration.config_file
environ['HTTP_HOST'] = parsed_url.netloc
environ['PATH_INFO'] = parsed_url.path
environ['QUERY_STRING'] = request_query
environ['REQUEST_METHOD'] = method
environ['SCRIPT_URI'] = ''.join(
('http://', environ['HTTP_HOST'], environ['PATH_INFO']))

if headers:
for k, v in headers.items():
header_key = k.replace('-', '_').upper()
if header_key.startswith('CONTENT'):
# Content-* headers must not be prefixed in WSGI
pass
else:
header_key = "HTTP_%s" % (header_key),
environ[header_key] = v

return environ


def create_wsgi_start_response():
return FakeWsgiStartResponse()


def prepare_wsgi(configuration, url, **kwargs):
return _PreparedWsgi(
create_wsgi_environ(configuration, url, **kwargs),
create_wsgi_start_response()
)


def _trigger_and_unpack_result(wsgi_result):
chunks = list(wsgi_result)
assert len(chunks) > 0, "invocation returned no output"
complete_value = b''.join(chunks)
decoded_value = codecs.decode(complete_value, 'utf8')
return decoded_value


class WsgiAssertMixin:
"""Custom assertions for verifying server code executed under test."""

def assertWsgiResponse(self, wsgi_result, fake_wsgi, expected_status_code):
assert isinstance(fake_wsgi, _PreparedWsgi)

content = _trigger_and_unpack_result(wsgi_result)

def called_once(fake):
assert hasattr(fake, 'calls')
return len(fake.calls) == 1

fake_start_response = fake_wsgi.start_response

try:
self.assertTrue(called_once(fake_start_response))
except AssertionError:
if len(fake.calls) == 0:
raise AssertionError("WSGI handler did not respond")
else:
raise AssertionError("WSGI handler responded more than once")

wsgi_call = fake_start_response.calls[0]

# check for expected HTTP status code
wsgi_status = wsgi_call[0]
actual_status_code = int(wsgi_status[0:3])
self.assertEqual(actual_status_code, expected_status_code)

headers = dict(wsgi_call[1])

return content, headers
Loading