Skip to content

Tighten redirect handling in the OpenID service #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 69 additions & 32 deletions mig/server/grid_openid.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
valid_path, valid_ascii, valid_job_id, valid_base_url, valid_url, \
valid_complex_url, InputException
from mig.shared.tlsserver import hardened_ssl_context
from mig.shared.url import urlparse, urlencode
from mig.shared.url import urlparse, urlencode, check_local_site_url
from mig.shared.useradm import get_openid_user_dn, check_password_scramble, \
check_hash
from mig.shared.userdb import default_db_path
Expand Down Expand Up @@ -189,9 +189,16 @@ def filter_why_pw(configuration, why):
if isinstance(why, server.EncodingError):
text = why.response.encodeToKVForm()
return strip_password(configuration, text)
elif isinstance(why, server.ProtocolError):
text = why.message
return strip_password(configuration, text)
else:
_logger.warning("can't filter password in unknown 'why': %s" % why)
return why
# IMPORTANT: do NOT log or show raw why as it may contain credentials
# NOTE: we should not get here, but return generic safe msg if we do.
why_type = type(why)
_logger.warning("can't filter unknown 'why' (%s) - show generic err" %
why_type)
return 'Unexpected %s to filter for safe log and output.' % why_type


def lookup_full_user(username):
Expand Down Expand Up @@ -468,7 +475,7 @@ def do_GET(self):
# Do not disclose internal details
filtered_exc = cgitb.text(sys.exc_info(), context=10)
# IMPORTANT: do NOT ever print or log raw password
pw = self.password
pw = self.password or ''
filtered_exc = filtered_exc.replace(pw, '*' * len(pw))
logger.debug("Traceback %s: %s" % (error_ref, filtered_exc))
err_msg = """<p class='leftpad'>
Expand Down Expand Up @@ -581,9 +588,20 @@ def handleAllow(self, query):
except server.ProtocolError as why:
# IMPORTANT: NEVER log or show raw why or query with password!
safe_query = strip_password(configuration, query)
logger.error("handleAllow got broken request: %s" % safe_query)
# NOTE: let displayResponse filter pw
self.displayResponse(why)
safe_why = filter_why_pw(configuration, why)
logger.error("handleAllow got broken request %s: %s" %
(safe_query, safe_why))
# IMPORTANT: do NOT use displayResponse here! It would a.o.
# uncritically forward user any unverified return_to
# value in query.
self.showErrorPage('''<h2>Error in Communication</h2>
You may have discovered a bug in the %s OpenID 2.0 service. Please report it
to the site admins if you keep getting here. If you arrived here using the
browser "back" button, however, that is expected since it results in
inconsistent session state.
<h3>Error details:</h3>
<pre>%s</pre>
''' % (configuration.short_title, cgi.escape(safe_why)))
return

logger.debug("handleAllow with last request %s from user %s" %
Expand Down Expand Up @@ -743,9 +761,20 @@ def serverEndPoint(self, query):
except server.ProtocolError as why:
# IMPORTANT: NEVER log or show raw why or query with password!
safe_query = strip_password(configuration, query)
logger.error("serverEndPoint got broken request: %s" % safe_query)
# NOTE: let displayResponse filter pw
self.displayResponse(why)
safe_why = filter_why_pw(configuration, why)
logger.error("serverEndPoint got broken request %s: %s" %
(safe_query, safe_why))
# IMPORTANT: do NOT use displayResponse here! It would a.o.
# uncritically forward user any unverified return_to
# value in query.
self.showErrorPage('''<h2>Error in Communication</h2>
You may have discovered a bug in the %s OpenID 2.0 service. Please report it
to the site admins if you keep getting here. If you arrived here using the
browser "back" button, however, that is expected since it results in
inconsistent session state.
<h3>Error details:</h3>
<pre>%s</pre>
''' % (configuration.short_title, cgi.escape(safe_why)))
return

if request is None:
Expand Down Expand Up @@ -825,18 +854,17 @@ def displayResponse(self, response):
try:
webresponse = self.server.openid.encodeResponse(response)
except server.EncodingError as why:
# IMPORTANT: always mask passwords in output for security
text = filter_why_pw(configuration, why)
# IMPORTANT: NEVER log or show raw why or query with password!
safe_why = filter_why_pw(configuration, why)
logger.error("displayResponse failed encode: %s" % safe_why)
self.showErrorPage('''<h2>Error in Communication</h2>
<p>
You may have discovered a bug in the OpenID service. Please report it to the
site admins if you keep getting here. If you arrived here using the browser
"back" button, however, that is expected since it results in inconsistent
session state.
</p>
You may have discovered a bug in the %s OpenID 2.0 service. Please report it
to the site admins if you keep getting here. If you arrived here using the
browser "back" button, however, that is expected since it results in
inconsistent session state.
<h3>Error details:</h3>
<pre>%s</pre>
''' % cgi.escape(text))
''' % (configuration.short_title, cgi.escape(safe_why)))
return

self.send_response(webresponse.code)
Expand Down Expand Up @@ -938,7 +966,8 @@ def doLogin(self):
self.user = self.query['user']
else:
self.clearUser()
self.redirect(self.query['success_to'])
success_to_url = self.query.get('success_to', None)
self.redirect(success_to_url)
return

if hit_rate_limit(configuration, "openid",
Expand Down Expand Up @@ -992,7 +1021,8 @@ def doLogin(self):
)

if authorized:
self.redirect(self.query['success_to'])
success_to_url = self.query.get('success_to', None)
self.redirect(success_to_url)
else:
logger.warning("login failed for %s" % self.user)
logger.debug("full query: %s" % self.query)
Expand All @@ -1012,23 +1042,30 @@ def doLogin(self):
retry_url = self.server.base_url
self.redirect(retry_url)
elif 'cancel' in self.query:
self.redirect(self.query['fail_to'])
fail_to_url = self.query.get('fail_to', None)
self.redirect(fail_to_url)
else:
assert 0, 'strange login %r' % (self.query,)

def doLogout(self):
"""Logout handler"""
logger.debug("logout clearing user %s" % self.user)
self.clearUser()
if 'return_to' in self.query:
# print "logout redirecting to %(return_to)s" % self.query
self.redirect(self.query['return_to'])
return_to_url = self.query.get('return_to', None)
if return_to_url:
self.redirect(return_to_url)

def redirect(self, url):
"""Redirect helper"""
self.send_response(302)
self.send_header('Location', url)
self.writeUserHeader()
"""Redirect helper with built-in check for safe destination URL"""

if url and not check_local_site_url(configuration, url):
logger.error("reject redirect to external URL %r" % url)
self.send_response(400)
else:
logger.debug("redirect to local site URL %r" % url)
self.send_response(302)
self.send_header('Location', url)
self.writeUserHeader()

self.end_headers()

Expand Down Expand Up @@ -1152,7 +1189,7 @@ def showErrorPage(self, error_message, error_code=400):

def showDecidePage(self, request):
"""Decide page provider"""
id_url_base = self.server.base_url+'id/'
id_url_base = self.server.base_url + 'id/'
# XXX: This may break if there are any synonyms for id_url_base,
# such as referring to it by IP address or a CNAME.
assert (request.identity.startswith(id_url_base) or
Expand Down Expand Up @@ -1297,7 +1334,7 @@ def showIdPage(self, path):
link_tag = '<link rel="openid.server" href="%sopenidserver">' % \
self.server.base_url
yadis_loc_tag = '<meta http-equiv="x-xrds-location" content="%s"/>' % \
(self.server.base_url+'yadis/'+path[4:])
(self.server.base_url + 'yadis/' + path[4:])
disco_tags = link_tag + yadis_loc_tag
ident = self.server.base_url + path[1:]

Expand Down Expand Up @@ -1689,7 +1726,7 @@ def start_service(configuration):
'root_dir': os.path.abspath(configuration.user_home),
'db_path': os.path.abspath(default_db_path(configuration)),
'session_store': os.path.abspath(configuration.openid_store),
'session_ttl': 24*3600,
'session_ttl': 24 * 3600,
'allow_password': 'password' in configuration.user_openid_auth,
'allow_digest': 'digest' in configuration.user_openid_auth,
'allow_publickey': 'publickey' in configuration.user_openid_auth,
Expand Down
37 changes: 37 additions & 0 deletions mig/shared/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,40 @@ def openid_basic_logout_url(
'logout?return_to=%s' % return_url)
_logger.debug("basic openid logout url: %s" % oid_logout_url)
return oid_logout_url


def _get_site_urls(configuration):
"""Helper to extract actually enabled site URLs from configuration. Namely,
the ones that are non-empty.
"""
site_url_list = (configuration.migserver_http_url,
configuration.migserver_https_url,
configuration.migserver_public_url,
configuration.migserver_public_alias_url,
configuration.migserver_https_mig_cert_url,
configuration.migserver_https_ext_cert_url,
configuration.migserver_https_mig_oid_url,
configuration.migserver_https_ext_oid_url,
configuration.migserver_https_mig_oidc_url,
configuration.migserver_https_ext_oidc_url,
configuration.migserver_https_sid_url)
return [url for url in site_url_list if url.strip()]


def check_local_site_url(configuration, url):
"""Check if provided url can possibly belongs to this site based on the
configuration. Only inspects the PROTOCOL and FQDN part of the url and
makes sure it fits one of the configured migserver_X_url names - not
whether the remaining part makes sense. The URL is expected to already be
basically validated for invalid character contents e.g. with something
like the mig.shared.safeinput.valid_complex_url helper.
"""
_logger = configuration.logger
proto, fqdn = urlsplit(url)[:2]
url_base = '%s://%s' % (proto, fqdn)
if proto and fqdn and not url_base in _get_site_urls(configuration):
_logger.error("Not a valid URL %r in local site URL check for %r" %
(url, url_base))
return False
_logger.debug("Verified URL %r to be on local site" % url)
return True
99 changes: 99 additions & 0 deletions tests/test_mig_shared_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
#
# --- BEGIN_HEADER ---
#
# test_mig_shared_url - unit test of the corresponding mig shared module
# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH
#
# This file is part of MiG.
#
# MiG is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# MiG is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
#
# --- END_HEADER ---
#

"""Unit tests for the migrid module pointed to in the filename"""

import os
import sys

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__))))

from tests.support import MigTestCase, FakeConfiguration, testmain
from mig.shared.url import _get_site_urls, check_local_site_url


def _generate_dynamic_site_urls(url_base_list):
"""Simple helper to construct a list of similar URLs for each base url in
url_base_list.
"""
site_urls = []
for url_base in url_base_list:
site_urls += ['%s' % url_base, '%s/' % url_base,
'%s/wsgi-bin/home.py' % url_base,
'%s/wsgi-bin/logout.py' % url_base,
'%s/wsgi-bin/logout.py?return_url=' % url_base,
'%s/wsgi-bin/logout.py?return_url=%s' % (url_base,
ENC_URL)
]
return site_urls


DUMMY_CONF = FakeConfiguration(migserver_http_url='http://myfqdn.org',
migserver_https_url='https://myfqdn.org',
migserver_https_mig_cert_url='',
migserver_https_ext_cert_url='',
migserver_https_mig_oid_url='',
migserver_https_ext_oid_url='',
migserver_https_mig_oidc_url='',
migserver_https_ext_oidc_url='',
migserver_https_sid_url='',
migserver_public_url='',
migserver_public_alias_url='')
ENC_URL = 'https%3A%2F%2Fsomewhere.org%2Fsub%0A'
# Static site-anchored and dynamic full URLs for local site URL check success
LOCAL_SITE_URLS = ['', 'abc', 'abc.txt', '/', '/bla', '/bla#anchor',
'/bla/', '/bla/#anchor', '/bla/bla', '/bla/bla/bla',
'//bla//', './bla', './bla/', './bla/bla',
'./bla/bla/bla', 'logout.py', 'logout.py?bla=',
'/cgi-sid/logout.py', '/cgi-sid/logout.py?bla=bla',
'/cgi-sid/logout.py?return_url=%s' % ENC_URL,
]
LOCAL_BASE_URLS = _get_site_urls(DUMMY_CONF)
LOCAL_SITE_URLS += _generate_dynamic_site_urls(LOCAL_SITE_URLS)
# Dynamic full URLs for local site URL check failure
REMOTE_BASE_URLS = ['https://someevilsite.com', 'ftp://someevilsite.com']
REMOTE_SITE_URLS = _generate_dynamic_site_urls(REMOTE_BASE_URLS)


class BasicUrl(MigTestCase):
"""Wrap unit tests for the corresponding module"""

def test_valid_local_site_urls(self):
"""Check known valid static and dynamic URLs"""
for url in LOCAL_SITE_URLS:
self.assertTrue(check_local_site_url(DUMMY_CONF, url),
"Local site url should succeed for %s" % url)

def test_invalid_local_site_urls(self):
"""Check known invalid URLs"""
for url in REMOTE_SITE_URLS:
self.assertFalse(check_local_site_url(DUMMY_CONF, url),
"Local site url should fail for %s" % url)


if __name__ == '__main__':
testmain()
Loading