From a17c11a275768e0e3f4b082da84bf385f38fdb26 Mon Sep 17 00:00:00 2001 From: Alex Burke Date: Fri, 18 Oct 2024 14:39:28 +0200 Subject: [PATCH 1/2] Use the correct force_utf8 function based on Python version. Import the experimental branch version of force_utf8 wholesale adding a -py(2|3) suffix and expose the correct implementation dependent on PY2. Include forcing InputException messages to a native string as is done in experimental (also taken directly from that branch) which ensures the exception message, which may be unicode, becomes a string everywhere. --- mig/shared/base.py | 28 +++++++++++++++- mig/shared/safeinput.py | 4 +-- tests/test_mig_shared_base.py | 60 +++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 tests/test_mig_shared_base.py diff --git a/mig/shared/base.py b/mig/shared/base.py index 64f12b370..b21d4ae6f 100644 --- a/mig/shared/base.py +++ b/mig/shared/base.py @@ -36,6 +36,7 @@ import re # IMPORTANT: do not import any other MiG modules here - to avoid import loops +from mig.shared.compat import PY2 from mig.shared.defaults import default_str_coding, default_fs_coding, \ keyword_all, keyword_auto, sandbox_names, _user_invisible_files, \ _user_invisible_dirs, _vgrid_xgi_scripts, cert_field_order, csrf_field, \ @@ -496,7 +497,7 @@ def is_unicode(val): return (type(u"") == type(val)) -def force_utf8(val, highlight=''): +def _force_utf8_py2(val, highlight=''): """Internal helper to encode unicode strings to utf8 version. Actual changes are marked out with the highlight string if given. """ @@ -507,6 +508,31 @@ def force_utf8(val, highlight=''): return val return "%s%s%s" % (highlight, val.encode("utf8"), highlight) +def _force_utf8_py3(val, highlight='', stringify=True): + """Internal helper to encode unicode strings to utf8 version. Actual + changes are marked out with the highlight string if given. + The optional stringify turns ALL values including numbers into string. + """ + # We run into all kind of nasty encoding problems if we mix + if not isinstance(val, basestring): + if stringify: + val = "%s" % val + else: + return val + if not is_unicode(val): + return val + if is_unicode(highlight): + hl_utf = highlight.encode("utf8") + else: + hl_utf = highlight + return (b"%s%s%s" % (hl_utf, val.encode("utf8"), hl_utf)) + + +if PY2: + force_utf8 = _force_utf8_py2 +else: + force_utf8 = _force_utf8_py3 + def force_utf8_rec(input_obj, highlight=''): """Recursive object conversion from unicode to utf8: useful to convert e.g. diff --git a/mig/shared/safeinput.py b/mig/shared/safeinput.py index 592250755..e91937d8c 100644 --- a/mig/shared/safeinput.py +++ b/mig/shared/safeinput.py @@ -58,7 +58,7 @@ from html import escape as escape_html assert escape_html is not None -from mig.shared.base import force_unicode, force_utf8 +from mig.shared.base import force_unicode, force_native_str from mig.shared.defaults import src_dst_sep, username_charset, \ username_max_length, session_id_charset, session_id_length, \ subject_id_charset, subject_id_min_length, subject_id_max_length, \ @@ -2294,7 +2294,7 @@ def __init__(self, value): def __str__(self): """Return string representation""" - return force_utf8(force_unicode(self.value)) + return force_native_str(self.value) def main(_exit=sys.exit, _print=print): diff --git a/tests/test_mig_shared_base.py b/tests/test_mig_shared_base.py new file mode 100644 index 000000000..82145cb20 --- /dev/null +++ b/tests/test_mig_shared_base.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# test_mig_shared_base - unit test of the corresponding mig shared module +# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# +# --- END_HEADER --- +# + +"""Unit test base functions""" + +import binascii +import codecs +import os +import sys + +from tests.support import PY2, MigTestCase, testmain + +from mig.shared.base import force_utf8 + +DUMMY_STRING = "foo bÆr baz" +DUMMY_UNICODE = u'UniCode123½¾µßðþđŋħĸþł@ª€£$¥©®' + + +class MigSharedBase(MigTestCase): + """Unit tests of fucntions within the mig.shared.base module.""" + + def test_force_utf8_on_string(self): + actual = force_utf8(DUMMY_STRING) + + self.assertIsInstance(actual, bytes) + self.assertEqual(binascii.hexlify(actual), b'666f6f2062c386722062617a') + + def test_force_utf8_on_unicode(self): + actual = force_utf8(DUMMY_UNICODE) + + self.assertIsInstance(actual, bytes) + self.assertEqual(actual, codecs.encode(DUMMY_UNICODE, 'utf8')) + + +if __name__ == '__main__': + testmain() From 34ea01957eda794d37238e978d070db51f03d5c8 Mon Sep 17 00:00:00 2001 From: Alex Burke Date: Wed, 11 Sep 2024 19:16:10 +0200 Subject: [PATCH 2/2] Make createuser work for new installations and add basic test. Allow user creation to provision the directries necessary for user creation to succeed in addition to the database file itself given the state that exists after config generation is run to completion for a new installation. Cover the very basic operation of createuser ensuring that a user db and lock file are correctly create upon a call to create a test user. --- mig/server/createuser.py | 129 ++++++++++++++++------- mig/shared/_env.py | 4 + mig/shared/accountstate.py | 1 + mig/shared/base.py | 4 +- mig/shared/compat.py | 7 ++ mig/shared/conf.py | 13 ++- mig/shared/configuration.py | 2 +- mig/shared/defaults.py | 3 +- mig/shared/pwcrypto.py | 12 ++- mig/shared/useradm.py | 52 +++++++++- tests/support/__init__.py | 29 +----- tests/support/_env.py | 30 ++++++ tests/support/_path.py | 34 ++++++ tests/support/picklesupp.py | 12 +++ tests/test_mig_server_createuser.py | 154 ++++++++++++++++++++++++++++ 15 files changed, 409 insertions(+), 77 deletions(-) create mode 100644 mig/shared/_env.py create mode 100644 tests/support/_path.py create mode 100644 tests/support/picklesupp.py create mode 100644 tests/test_mig_server_createuser.py diff --git a/mig/server/createuser.py b/mig/server/createuser.py index 8087e4f76..b55c8b88f 100755 --- a/mig/server/createuser.py +++ b/mig/server/createuser.py @@ -33,6 +33,7 @@ from builtins import input from getpass import getpass import datetime +import errno import getopt import os import sys @@ -91,8 +92,7 @@ def usage(name='createuser.py'): """ % {'name': name, 'cert_warn': cert_warn}) -if '__main__' == __name__: - (args, app_dir, db_path) = init_user_adm() +def main(_main, args, cwd, db_path=keyword_auto): conf_path = None auth_type = 'custom' expire = None @@ -111,6 +111,7 @@ def usage(name='createuser.py'): user_dict = {} override_fields = {} opt_args = 'a:c:d:e:fhi:o:p:rR:s:u:v' + try: (opts, args) = getopt.getopt(args, opt_args) except getopt.GetoptError as err: @@ -138,13 +139,8 @@ def usage(name='createuser.py'): parsed = True break except ValueError: - pass - if parsed: - override_fields['expire'] = expire - override_fields['status'] = 'temporal' - else: - print('Failed to parse expire value: %s' % val) - sys.exit(1) + print('Failed to parse expire value: %s' % val) + sys.exit(1) elif opt == '-f': force = True elif opt == '-h': @@ -154,17 +150,13 @@ def usage(name='createuser.py'): user_id = val elif opt == '-o': short_id = val - override_fields['short_id'] = short_id elif opt == '-p': peer_pattern = val - override_fields['peer_pattern'] = peer_pattern - override_fields['status'] = 'temporal' elif opt == '-r': default_renew = True ask_renew = False elif opt == '-R': role = val - override_fields['role'] = role elif opt == '-s': # Translate slack days into seconds as slack_secs = int(float(val)*24*3600) @@ -178,7 +170,12 @@ def usage(name='createuser.py'): print('Error: %s not supported!' % opt) sys.exit(1) - if conf_path and not os.path.isfile(conf_path): + if not conf_path: + # explicitly set the default value of keyword_auto if no option was + # provided since it is unconditionally passed inward as a keyword arg + # and thus the fallback would accidentally be ignored + conf_path = keyword_auto + elif not os.path.isfile(conf_path): print('Failed to read configuration file: %s' % conf_path) sys.exit(1) @@ -190,30 +187,76 @@ def usage(name='createuser.py'): if verbose: print('using configuration from MIG_CONF (or default)') - configuration = get_configuration_object(config_file=conf_path) + ret = _main(None, args, + conf_path=conf_path, + db_path=db_path, + expire=expire, + force=force, + verbose=verbose, + ask_renew=ask_renew, + default_renew=default_renew, + ask_change_pw=ask_change_pw, + user_file=user_file, + user_id=user_id, + short_id=short_id, + role=role, + peer_pattern=peer_pattern, + slack_secs=slack_secs, + hash_password=hash_password + ) + + if ret == errno.ENOTSUP: + usage() + sys.exit(1) + + sys.exit(ret) + + +def _main(configuration, args, + conf_path=keyword_auto, + db_path=keyword_auto, + auth_type='custom', + expire=None, + force=False, + verbose=False, + ask_renew=True, + default_renew=False, + ask_change_pw=True, + user_file=None, + user_id=None, + short_id=None, + role=None, + peer_pattern=None, + slack_secs=0, + hash_password=True, + _generate_salt=None + ): + if configuration is None: + if conf_path == keyword_auto: + config_file = None + else: + config_file = conf_path + configuration = get_configuration_object(config_file=config_file) + logger = configuration.logger + # NOTE: we need explicit db_path lookup here for load_user_dict call if db_path == keyword_auto: db_path = default_db_path(configuration) if user_file and args: print('Error: Only one kind of user specification allowed at a time') - usage() - sys.exit(1) + return errno.ENOTSUP if auth_type not in valid_auth_types: print('Error: invalid account auth type %r requested (allowed: %s)' % (auth_type, ', '.join(valid_auth_types))) - usage() - sys.exit(1) + return errno.ENOTSUP # NOTE: renew requires original password if auth_type == 'cert': hash_password = False - if expire is None: - expire = default_account_expire(configuration, auth_type) - raw_user = {} if args: try: @@ -229,8 +272,7 @@ def usage(name='createuser.py'): except IndexError: print('Error: too few arguments given (expected 7 got %d)' % len(args)) - usage() - sys.exit(1) + return errno.ENOTSUP # Force user ID fields to canonical form for consistency # Title name, lowercase email, uppercase country and state, etc. user_dict = canonical_user(configuration, raw_user, raw_user.keys()) @@ -239,14 +281,12 @@ def usage(name='createuser.py'): user_dict = load(user_file) except Exception as err: print('Error in user name extraction: %s' % err) - usage() - sys.exit(1) + return errno.ENOTSUP elif default_renew and user_id: saved = load_user_dict(logger, user_id, db_path, verbose) if not saved: print('Error: no such user in user db: %s' % user_id) - usage() - sys.exit(1) + return errno.ENOTSUP user_dict.update(saved) del user_dict['expire'] elif not configuration.site_enable_gdp: @@ -268,13 +308,13 @@ def usage(name='createuser.py'): print("Error: Missing one or more of the arguments: " + "[FULL_NAME] [ORGANIZATION] [STATE] [COUNTRY] " + "[EMAIL] [COMMENT] [PASSWORD]") - sys.exit(1) + return 1 # Encode password if set but not already encoded if user_dict['password']: if hash_password: - user_dict['password_hash'] = make_hash(user_dict['password']) + user_dict['password_hash'] = make_hash(user_dict['password'], _generate_salt=_generate_salt) user_dict['password'] = '' else: salt = configuration.site_password_salt @@ -291,9 +331,19 @@ def usage(name='createuser.py'): fill_user(user_dict) - # Make sure account expire is set with local certificate or OpenID login - + # assemble the fields to be explicitly overriden + override_fields = {} + if peer_pattern: + override_fields['peer_pattern'] = peer_pattern + override_fields['status'] = 'temporal' + if role: + override_fields['role'] = role + if short_id: + override_fields['short_id'] = short_id if 'expire' not in user_dict: + # Make sure account expire is set with local certificate or OpenID login + if not expire: + expire = default_account_expire(configuration, auth_type) override_fields['expire'] = expire # NOTE: let non-ID command line values override loaded values @@ -305,8 +355,10 @@ def usage(name='createuser.py'): if verbose: print('using user dict: %s' % user_dict) try: - create_user(user_dict, conf_path, db_path, force, verbose, ask_renew, - default_renew, verify_peer=peer_pattern, + conf_path = configuration.config_file + create_user(user_dict, conf_path, db_path, configuration, force, verbose, ask_renew, + default_renew, + verify_peer=peer_pattern, peer_expire_slack=slack_secs, ask_change_pw=ask_change_pw) if configuration.site_enable_gdp: (success_here, msg) = ensure_gdp_user(configuration, @@ -319,10 +371,17 @@ def usage(name='createuser.py'): print("Error creating user: %s" % exc) import traceback logger.warning("Error creating user: %s" % traceback.format_exc()) - sys.exit(1) + return 1 print('Created or updated %s in user database and in file system' % user_dict['distinguished_name']) if user_file: if verbose: print('Cleaning up tmp file: %s' % user_file) os.remove(user_file) + + return 0 + + +if __name__ == '__main__': + (args, cwd, db_path) = init_user_adm() + main(_main, args, cwd, db_path=db_path) diff --git a/mig/shared/_env.py b/mig/shared/_env.py new file mode 100644 index 000000000..0db1212d9 --- /dev/null +++ b/mig/shared/_env.py @@ -0,0 +1,4 @@ +import os + +MIG_BASE = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..')) +MIG_ENV = os.getenv('MIG_ENV', 'default') diff --git a/mig/shared/accountstate.py b/mig/shared/accountstate.py index 21330ada7..ddf827795 100644 --- a/mig/shared/accountstate.py +++ b/mig/shared/accountstate.py @@ -33,6 +33,7 @@ from __future__ import absolute_import from past.builtins import basestring +from past.builtins import basestring import os import time diff --git a/mig/shared/base.py b/mig/shared/base.py index b21d4ae6f..006271c2a 100644 --- a/mig/shared/base.py +++ b/mig/shared/base.py @@ -295,7 +295,9 @@ def canonical_user(configuration, user_dict, limit_fields): if key == 'full_name': # IMPORTANT: we get utf8 coded bytes here and title() treats such # chars as word termination. Temporarily force to unicode. - val = force_utf8(force_unicode(val).title()) + val = force_unicode(val).title() + if PY2: + val = force_utf8(val) elif key == 'email': val = val.lower() elif key == 'country': diff --git a/mig/shared/compat.py b/mig/shared/compat.py index ac5ab0f75..09935f18c 100644 --- a/mig/shared/compat.py +++ b/mig/shared/compat.py @@ -55,6 +55,13 @@ def _is_unicode(val): return (type(val) == _TYPE_UNICODE) +def _unicode_string_to_escaped_unicode(unicode_string): + """Convert utf8 bytes to escaped unicode string.""" + + utf8_bytes = dn_utf8_bytes = codecs.encode(unicode_string, 'utf8') + return codecs.decode(utf8_bytes, 'unicode_escape') + + def ensure_native_string(string_or_bytes): """Given a supplied input which can be either a string or bytes return a representation providing string operations while ensuring that diff --git a/mig/shared/conf.py b/mig/shared/conf.py index 15e36f6fb..4533d0162 100644 --- a/mig/shared/conf.py +++ b/mig/shared/conf.py @@ -32,6 +32,7 @@ import os import sys +from mig.shared._env import MIG_BASE from mig.shared.fileio import unpickle @@ -43,16 +44,14 @@ def get_configuration_object(config_file=None, skip_log=False, """ from mig.shared.configuration import Configuration if config_file: + # use config file path passed explicitly to the function _config_file = config_file elif os.environ.get('MIG_CONF', None): + # use config file explicitly set in the environment _config_file = os.environ['MIG_CONF'] else: - app_dir = os.path.dirname(sys.argv[0]) - if not app_dir: - _config_file = '../server/MiGserver.conf' - else: - _config_file = os.path.join(app_dir, '..', 'server', - 'MiGserver.conf') + # find config file relative to the directory in which the scrip resides + _config_file = os.path.join(MIG_BASE, 'server/MiGserver.conf') configuration = Configuration(_config_file, False, skip_log, disable_auth_log) return configuration @@ -61,7 +60,7 @@ def get_configuration_object(config_file=None, skip_log=False, def get_resource_configuration(resource_home, unique_resource_name, logger): """Load a resource configuration from file""" - + # open the configuration file resource_config_file = resource_home + '/' + unique_resource_name\ diff --git a/mig/shared/configuration.py b/mig/shared/configuration.py index d9ad2a2e8..13f5ec121 100644 --- a/mig/shared/configuration.py +++ b/mig/shared/configuration.py @@ -737,7 +737,7 @@ def reload_config(self, verbose, skip_log=False, disable_auth_log=False, else: print("""Could not find your configuration file (%s). You might need to point the MIG_CONF environment to your actual MiGserver.conf -location.""" % self.config_file) +location.""" % _config_file) raise IOError config = ConfigParser() diff --git a/mig/shared/defaults.py b/mig/shared/defaults.py index 15d8ea1ca..6c38eecb9 100644 --- a/mig/shared/defaults.py +++ b/mig/shared/defaults.py @@ -32,8 +32,7 @@ import os import sys -MIG_BASE = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..')) -MIG_ENV = os.getenv('MIG_ENV', 'default') +from mig.shared._env import MIG_BASE, MIG_ENV # NOTE: python3 switched strings to use unicode by default in contrast to bytes # in python2. File systems remain with utf8 however so we need to diff --git a/mig/shared/pwcrypto.py b/mig/shared/pwcrypto.py index 5d1ee1c33..e8825cd07 100644 --- a/mig/shared/pwcrypto.py +++ b/mig/shared/pwcrypto.py @@ -114,9 +114,17 @@ def best_crypt_salt(configuration): return salt_data -def make_hash(password): +def _random_salt(): + return b64encode(urandom(SALT_LENGTH)) + + +def make_hash(password, _generate_salt=None): """Generate a random salt and return a new hash for the password.""" - salt = b64encode(urandom(SALT_LENGTH)) + + if _generate_salt is None: + _generate_salt = _random_salt + + salt = _generate_salt() derived = b64encode(hashlib.pbkdf2_hmac(HASH_FUNCTION, force_utf8(password), salt, COST_FACTOR, KEY_LENGTH)) diff --git a/mig/shared/useradm.py b/mig/shared/useradm.py index 9a64964f3..1be0fe3c5 100644 --- a/mig/shared/useradm.py +++ b/mig/shared/useradm.py @@ -30,8 +30,11 @@ from __future__ import print_function from __future__ import absolute_import +from past.builtins import basestring from email.utils import parseaddr +import codecs import datetime +import errno import fnmatch import os import re @@ -44,6 +47,7 @@ from mig.shared.base import client_id_dir, client_dir_id, client_alias, \ get_client_id, extract_field, fill_user, fill_distinguished_name, \ is_gdp_user, mask_creds, sandbox_resource +from mig.shared.compat import PY2, _unicode_string_to_escaped_unicode from mig.shared.conf import get_configuration_object from mig.shared.configuration import Configuration from mig.shared.defaults import user_db_filename, keyword_auto, ssh_conf_dir, \ @@ -82,6 +86,9 @@ force_update_user_map, force_update_resource_map, force_update_vgrid_map, \ VGRIDS, OWNERS, MEMBERS +if not PY2: + raw_input = input + ssh_authkeys = os.path.join(ssh_conf_dir, authkeys_filename) ssh_authpasswords = os.path.join(ssh_conf_dir, authpasswords_filename) ssh_authdigests = os.path.join(ssh_conf_dir, authdigests_filename) @@ -97,6 +104,10 @@ https_authdigests = user_db_filename +_USERADM_CONFIG_DIR_KEYS = ('user_db_home', 'user_home', 'user_settings', + 'user_cache', 'mrsl_files_dir', 'resource_pending') + + def init_user_adm(dynamic_db_path=True): """Shared init function for all user administration scripts. The optional dynamic_db_path argument toggles dynamic user db path lookup @@ -451,6 +462,21 @@ def verify_user_peers(configuration, db_path, client_id, user, now, verify_peer, return accepted_peer_list, effective_expire +def _check_directories_unprovisioned(configuration, db_path): + user_db_home = os.path.dirname(db_path) + return not os.path.exists(db_path) and not os.path.exists(user_db_home) + + +def _provision_directories(configuration): + for config_attr in _USERADM_CONFIG_DIR_KEYS: + try: + dir_to_create = getattr(configuration, config_attr) + os.mkdir(dir_to_create) + except OSError as oserr: + if oserr.errno != errno.ENOENT: # FileNotFoundError + raise + + def create_user_in_db(configuration, db_path, client_id, user, now, authorized, reset_token, reset_auth_type, accepted_peer_list, force, verbose, ask_renew, default_renew, do_lock, @@ -463,8 +489,25 @@ def create_user_in_db(configuration, db_path, client_id, user, now, authorized, flock = None user_db = {} renew = default_renew + + retry_lock = False if do_lock: + try: + flock = lock_user_db(db_path) + except (IOError, OSError) as oserr: + if oserr.errno != errno.ENOENT: # FileNotFoundError + raise + + if _check_directories_unprovisioned(configuration, db_path=db_path): + _provision_directories(configuration) + retry_lock = True + else: + raise Exception("Failed to lock user DB: '%s'" % db_path) + + if retry_lock: flock = lock_user_db(db_path) + if not flock: + raise Exception("Failed to lock user DB: '%s'" % db_path) if not os.path.exists(db_path): # Auto-create missing user DB if either auto_create_db or force is set @@ -859,7 +902,7 @@ def create_user_in_fs(configuration, client_id, user, now, renew, force, verbose # match in htaccess dn_plain = info['distinguished_name'] - dn_enc = dn_plain.encode('string_escape') + dn_enc = _unicode_string_to_escaped_unicode(dn_plain) def upper_repl(match): """Translate hex codes to upper case form""" @@ -1013,7 +1056,7 @@ def upper_repl(match): raise Exception('could not create custom css file: %s' % css_path) -def create_user(user, conf_path, db_path, force=False, verbose=False, +def create_user(user, conf_path, db_path, configuration=None, force=False, verbose=False, ask_renew=True, default_renew=False, do_lock=True, verify_peer=None, peer_expire_slack=0, from_edit_user=False, ask_change_pw=False, auto_create_db=True, create_backup=True): @@ -1021,7 +1064,10 @@ def create_user(user, conf_path, db_path, force=False, verbose=False, format as a first step. """ - if conf_path: + if configuration is not None: + # use it + pass + elif conf_path: if isinstance(conf_path, basestring): # has been checked for accessibility above... diff --git a/tests/support/__init__.py b/tests/support/__init__.py index 422182b4a..b64603d05 100644 --- a/tests/support/__init__.py +++ b/tests/support/__init__.py @@ -41,6 +41,8 @@ from unittest import TestCase, main as testmain from tests.support.configsupp import FakeConfiguration +from tests.support.picklesupp import verify_path_within_output_dir_and_return, \ + is_path_within from tests.support.suppconst import MIG_BASE, TEST_BASE, TEST_FIXTURE_DIR, \ TEST_DATA_DIR, TEST_OUTPUT_DIR, ENVHELP_OUTPUT_DIR @@ -296,16 +298,6 @@ def pretty_display_path(absolute_path): return relative_path -def is_path_within(path, start=None, _msg=None): - """Check if path is within start directory""" - try: - assert os.path.isabs(path), _msg - relative = os.path.relpath(path, start=start) - except: - return False - return not relative.startswith('..') - - def ensure_dirs_exist(absolute_dir): """A simple helper to create absolute_dir and any parents if missing""" try: @@ -399,22 +391,7 @@ def temppath(relative_path, test_case, ensure_dir=False, skip_clean=False): """ assert isinstance(test_case, MigTestCase) - if os.path.isabs(relative_path): - # the only permitted paths are those within the output directory set - # aside for execution of the test suite: this will be enforced below - # so effectively submit the supplied path for scrutiny - tmp_path = relative_path - else: - tmp_path = os.path.join(TEST_OUTPUT_DIR, relative_path) - - # failsafe path checking that supplied paths are rooted within valid paths - is_tmp_path_within_safe_dir = False - for start in (ENVHELP_OUTPUT_DIR): - is_tmp_path_within_safe_dir = is_path_within(tmp_path, start=start) - if is_tmp_path_within_safe_dir: - break - if not is_tmp_path_within_safe_dir: - raise AssertionError("ABORT: corrupt test path=%s" % (tmp_path,)) + tmp_path = verify_path_within_output_dir_and_return(relative_path) if ensure_dir: try: diff --git a/tests/support/_env.py b/tests/support/_env.py index 2c71386a4..f89b330e6 100644 --- a/tests/support/_env.py +++ b/tests/support/_env.py @@ -1,3 +1,33 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# mig.shared._env - environment related consants only +# Copyright (C) 2003-2024 The MiG Project lead by Brian Vinter +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + + +"""Environment consants separated from defaults to reduce import pollution.""" + import os import sys diff --git a/tests/support/_path.py b/tests/support/_path.py new file mode 100644 index 000000000..908cfb493 --- /dev/null +++ b/tests/support/_path.py @@ -0,0 +1,34 @@ +import os + +from tests.support.suppconst import TEST_OUTPUT_DIR, ENVHELP_OUTPUT_DIR + + +def is_path_within(path, start=None, _msg=None): + """Check if path is within start directory""" + try: + assert os.path.isabs(path), _msg + relative = os.path.relpath(path, start=start) + except: + return False + return not relative.startswith('..') + + +def verify_path_within_output_dir_and_return(relative_path): + if os.path.isabs(relative_path): + # the only permitted paths are those within the output directory set + # aside for execution of the test suite: this will be enforced below + # so effectively submit the supplied path for scrutiny + tmp_path = relative_path + else: + tmp_path = os.path.join(TEST_OUTPUT_DIR, relative_path) + + # failsafe path checking that supplied paths are rooted within valid paths + is_tmp_path_within_safe_dir = False + for start in (ENVHELP_OUTPUT_DIR): + is_tmp_path_within_safe_dir = is_path_within(tmp_path, start=start) + if is_tmp_path_within_safe_dir: + break + if not is_tmp_path_within_safe_dir: + raise AssertionError("ABORT: corrupt test path=%s" % (tmp_path,)) + + return tmp_path diff --git a/tests/support/picklesupp.py b/tests/support/picklesupp.py new file mode 100644 index 000000000..5b6a03441 --- /dev/null +++ b/tests/support/picklesupp.py @@ -0,0 +1,12 @@ +import pickle + +from tests.support._path import verify_path_within_output_dir_and_return, \ + is_path_within + + +class PickleAssertMixin: + def assertPickledFile(self, pickle_file): + tmp_path = verify_path_within_output_dir_and_return(pickle_file) + + with open(tmp_path, 'rb') as picklefile: + return pickle.load(picklefile) diff --git a/tests/test_mig_server_createuser.py b/tests/test_mig_server_createuser.py new file mode 100644 index 000000000..573dca8e0 --- /dev/null +++ b/tests/test_mig_server_createuser.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# test_mig_server-createuser - unit tests for the migrid createuser CLI +# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# +# --- END_HEADER --- +# + +"""Unit tests for the migrid createuser CLI""" + +from __future__ import print_function +import errno +import os +import shutil +import sys + +from tests.support import PY2, MIG_BASE, TEST_OUTPUT_DIR, MigTestCase, testmain +from tests.support.picklesupp import PickleAssertMixin + +from mig.server.createuser import _main as createuser +from mig.shared.useradm import _USERADM_CONFIG_DIR_KEYS + + +class TestMigServerCreateuser(MigTestCase, PickleAssertMixin): + def before_each(self): + configuration = self.configuration + test_state_path = configuration.state_path + + for config_key in _USERADM_CONFIG_DIR_KEYS: + dir_path = getattr(configuration, config_key)[0:-1] + try: + shutil.rmtree(dir_path) + except: + pass + + self.expected_user_db_home = configuration.user_db_home[0:-1] + self.expected_user_db_file = os.path.join(self.expected_user_db_home, 'MiG-users.db') + + def _provide_configuration(self): + return 'testconfig' + + def test_user_db_is_created_when_absent(self): + args = [ + "Test User", + "Test Org", + "NA", + "DK", + "user@example.com", + "This is the create comment", + "password" + ] + print("") # acount for output generated by the logic + createuser(self.configuration, args, default_renew=True) + + # presence of user home + path_kind = MigTestCase._absolute_path_kind(self.expected_user_db_home) + self.assertEqual(path_kind, 'dir') + + # presence of user db + expected_user_db_file = os.path.join( + self.expected_user_db_home, 'MiG-users.db') + path_kind = MigTestCase._absolute_path_kind(expected_user_db_file) + self.assertEqual(path_kind, 'file') + + + def test_user_entry_is_recorded(self): + def _generate_salt(): + return b'CCCC12344321CCCC' + + expected_user_id = '/C=DK/ST=NA/L=NA/O=Test Org/OU=NA/CN=Test User/emailAddress=user@example.com' + if PY2: + expected_user_password_hash = 'PBKDF2$sha256$10000$CCCC12344321CCCC$bph8p/avUq42IYeOdJoJuUqrJ7Q32eaT' + else: + expected_user_password_hash = "PBKDF2$sha256$10000$b'CCCC12344321CCCC'$b'bph8p/avUq42IYeOdJoJuUqrJ7Q32eaT'" + + args = [ + "Test User", + "Test Org", + "NA", + "DK", + "user@example.com", + "This is the create comment", + "password" + ] + print("") # acount for output generated by the logic + + createuser(self.configuration, args, default_renew=True, + _generate_salt=_generate_salt) + + pickled = self.assertPickledFile(self.expected_user_db_file) + self.assertIn(expected_user_id, pickled) + actual_user_object = dict(pickled[expected_user_id]) + # TODO: remove resetting the handful of keys here done because changes + # to make them assertion frienfly values will increase the size + # of the diff which, at time of commit, are best minimised. + actual_user_object['created'] = 9999999999.9999999 + actual_user_object['expire'] = 1234567890 + actual_user_object['unique_id'] = '__UNIQUE_ID__' + + self.assertEqual(actual_user_object, { + 'comment': "This is the create comment", + 'country': 'DK', + 'created': 9999999999.9999999, + 'distinguished_name': expected_user_id, + 'email': "user@example.com", + 'expire': 1234567890, + 'full_name': 'Test User', + 'locality': '', + 'openid_names': [], + 'organization': 'Test Org', + 'organizational_unit': '', + "password": "", + "password_hash": expected_user_password_hash, + 'state': 'NA', + 'unique_id': '__UNIQUE_ID__', + }) + + def test_missing_arguments(self): + args = [ + "Test User", + "Test Org", + "NA", + "DK", + "user@example.com", + "This is the create comment", + # leave off a password + ] + print("") # acount for output generated by the logic + + ret = createuser(self.configuration, args) + + self.assertEqual(ret, errno.ENOTSUP) + +if __name__ == '__main__': + testmain()