diff --git a/mig/server/createuser.py b/mig/server/createuser.py index 8087e4f76..b55c8b88f 100755 --- a/mig/server/createuser.py +++ b/mig/server/createuser.py @@ -33,6 +33,7 @@ from builtins import input from getpass import getpass import datetime +import errno import getopt import os import sys @@ -91,8 +92,7 @@ def usage(name='createuser.py'): """ % {'name': name, 'cert_warn': cert_warn}) -if '__main__' == __name__: - (args, app_dir, db_path) = init_user_adm() +def main(_main, args, cwd, db_path=keyword_auto): conf_path = None auth_type = 'custom' expire = None @@ -111,6 +111,7 @@ def usage(name='createuser.py'): user_dict = {} override_fields = {} opt_args = 'a:c:d:e:fhi:o:p:rR:s:u:v' + try: (opts, args) = getopt.getopt(args, opt_args) except getopt.GetoptError as err: @@ -138,13 +139,8 @@ def usage(name='createuser.py'): parsed = True break except ValueError: - pass - if parsed: - override_fields['expire'] = expire - override_fields['status'] = 'temporal' - else: - print('Failed to parse expire value: %s' % val) - sys.exit(1) + print('Failed to parse expire value: %s' % val) + sys.exit(1) elif opt == '-f': force = True elif opt == '-h': @@ -154,17 +150,13 @@ def usage(name='createuser.py'): user_id = val elif opt == '-o': short_id = val - override_fields['short_id'] = short_id elif opt == '-p': peer_pattern = val - override_fields['peer_pattern'] = peer_pattern - override_fields['status'] = 'temporal' elif opt == '-r': default_renew = True ask_renew = False elif opt == '-R': role = val - override_fields['role'] = role elif opt == '-s': # Translate slack days into seconds as slack_secs = int(float(val)*24*3600) @@ -178,7 +170,12 @@ def usage(name='createuser.py'): print('Error: %s not supported!' % opt) sys.exit(1) - if conf_path and not os.path.isfile(conf_path): + if not conf_path: + # explicitly set the default value of keyword_auto if no option was + # provided since it is unconditionally passed inward as a keyword arg + # and thus the fallback would accidentally be ignored + conf_path = keyword_auto + elif not os.path.isfile(conf_path): print('Failed to read configuration file: %s' % conf_path) sys.exit(1) @@ -190,30 +187,76 @@ def usage(name='createuser.py'): if verbose: print('using configuration from MIG_CONF (or default)') - configuration = get_configuration_object(config_file=conf_path) + ret = _main(None, args, + conf_path=conf_path, + db_path=db_path, + expire=expire, + force=force, + verbose=verbose, + ask_renew=ask_renew, + default_renew=default_renew, + ask_change_pw=ask_change_pw, + user_file=user_file, + user_id=user_id, + short_id=short_id, + role=role, + peer_pattern=peer_pattern, + slack_secs=slack_secs, + hash_password=hash_password + ) + + if ret == errno.ENOTSUP: + usage() + sys.exit(1) + + sys.exit(ret) + + +def _main(configuration, args, + conf_path=keyword_auto, + db_path=keyword_auto, + auth_type='custom', + expire=None, + force=False, + verbose=False, + ask_renew=True, + default_renew=False, + ask_change_pw=True, + user_file=None, + user_id=None, + short_id=None, + role=None, + peer_pattern=None, + slack_secs=0, + hash_password=True, + _generate_salt=None + ): + if configuration is None: + if conf_path == keyword_auto: + config_file = None + else: + config_file = conf_path + configuration = get_configuration_object(config_file=config_file) + logger = configuration.logger + # NOTE: we need explicit db_path lookup here for load_user_dict call if db_path == keyword_auto: db_path = default_db_path(configuration) if user_file and args: print('Error: Only one kind of user specification allowed at a time') - usage() - sys.exit(1) + return errno.ENOTSUP if auth_type not in valid_auth_types: print('Error: invalid account auth type %r requested (allowed: %s)' % (auth_type, ', '.join(valid_auth_types))) - usage() - sys.exit(1) + return errno.ENOTSUP # NOTE: renew requires original password if auth_type == 'cert': hash_password = False - if expire is None: - expire = default_account_expire(configuration, auth_type) - raw_user = {} if args: try: @@ -229,8 +272,7 @@ def usage(name='createuser.py'): except IndexError: print('Error: too few arguments given (expected 7 got %d)' % len(args)) - usage() - sys.exit(1) + return errno.ENOTSUP # Force user ID fields to canonical form for consistency # Title name, lowercase email, uppercase country and state, etc. user_dict = canonical_user(configuration, raw_user, raw_user.keys()) @@ -239,14 +281,12 @@ def usage(name='createuser.py'): user_dict = load(user_file) except Exception as err: print('Error in user name extraction: %s' % err) - usage() - sys.exit(1) + return errno.ENOTSUP elif default_renew and user_id: saved = load_user_dict(logger, user_id, db_path, verbose) if not saved: print('Error: no such user in user db: %s' % user_id) - usage() - sys.exit(1) + return errno.ENOTSUP user_dict.update(saved) del user_dict['expire'] elif not configuration.site_enable_gdp: @@ -268,13 +308,13 @@ def usage(name='createuser.py'): print("Error: Missing one or more of the arguments: " + "[FULL_NAME] [ORGANIZATION] [STATE] [COUNTRY] " + "[EMAIL] [COMMENT] [PASSWORD]") - sys.exit(1) + return 1 # Encode password if set but not already encoded if user_dict['password']: if hash_password: - user_dict['password_hash'] = make_hash(user_dict['password']) + user_dict['password_hash'] = make_hash(user_dict['password'], _generate_salt=_generate_salt) user_dict['password'] = '' else: salt = configuration.site_password_salt @@ -291,9 +331,19 @@ def usage(name='createuser.py'): fill_user(user_dict) - # Make sure account expire is set with local certificate or OpenID login - + # assemble the fields to be explicitly overriden + override_fields = {} + if peer_pattern: + override_fields['peer_pattern'] = peer_pattern + override_fields['status'] = 'temporal' + if role: + override_fields['role'] = role + if short_id: + override_fields['short_id'] = short_id if 'expire' not in user_dict: + # Make sure account expire is set with local certificate or OpenID login + if not expire: + expire = default_account_expire(configuration, auth_type) override_fields['expire'] = expire # NOTE: let non-ID command line values override loaded values @@ -305,8 +355,10 @@ def usage(name='createuser.py'): if verbose: print('using user dict: %s' % user_dict) try: - create_user(user_dict, conf_path, db_path, force, verbose, ask_renew, - default_renew, verify_peer=peer_pattern, + conf_path = configuration.config_file + create_user(user_dict, conf_path, db_path, configuration, force, verbose, ask_renew, + default_renew, + verify_peer=peer_pattern, peer_expire_slack=slack_secs, ask_change_pw=ask_change_pw) if configuration.site_enable_gdp: (success_here, msg) = ensure_gdp_user(configuration, @@ -319,10 +371,17 @@ def usage(name='createuser.py'): print("Error creating user: %s" % exc) import traceback logger.warning("Error creating user: %s" % traceback.format_exc()) - sys.exit(1) + return 1 print('Created or updated %s in user database and in file system' % user_dict['distinguished_name']) if user_file: if verbose: print('Cleaning up tmp file: %s' % user_file) os.remove(user_file) + + return 0 + + +if __name__ == '__main__': + (args, cwd, db_path) = init_user_adm() + main(_main, args, cwd, db_path=db_path) diff --git a/mig/shared/_env.py b/mig/shared/_env.py new file mode 100644 index 000000000..0db1212d9 --- /dev/null +++ b/mig/shared/_env.py @@ -0,0 +1,4 @@ +import os + +MIG_BASE = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..')) +MIG_ENV = os.getenv('MIG_ENV', 'default') diff --git a/mig/shared/accountstate.py b/mig/shared/accountstate.py index 21330ada7..ddf827795 100644 --- a/mig/shared/accountstate.py +++ b/mig/shared/accountstate.py @@ -33,6 +33,7 @@ from __future__ import absolute_import from past.builtins import basestring +from past.builtins import basestring import os import time diff --git a/mig/shared/base.py b/mig/shared/base.py index 64f12b370..006271c2a 100644 --- a/mig/shared/base.py +++ b/mig/shared/base.py @@ -36,6 +36,7 @@ import re # IMPORTANT: do not import any other MiG modules here - to avoid import loops +from mig.shared.compat import PY2 from mig.shared.defaults import default_str_coding, default_fs_coding, \ keyword_all, keyword_auto, sandbox_names, _user_invisible_files, \ _user_invisible_dirs, _vgrid_xgi_scripts, cert_field_order, csrf_field, \ @@ -294,7 +295,9 @@ def canonical_user(configuration, user_dict, limit_fields): if key == 'full_name': # IMPORTANT: we get utf8 coded bytes here and title() treats such # chars as word termination. Temporarily force to unicode. - val = force_utf8(force_unicode(val).title()) + val = force_unicode(val).title() + if PY2: + val = force_utf8(val) elif key == 'email': val = val.lower() elif key == 'country': @@ -496,7 +499,7 @@ def is_unicode(val): return (type(u"") == type(val)) -def force_utf8(val, highlight=''): +def _force_utf8_py2(val, highlight=''): """Internal helper to encode unicode strings to utf8 version. Actual changes are marked out with the highlight string if given. """ @@ -507,6 +510,31 @@ def force_utf8(val, highlight=''): return val return "%s%s%s" % (highlight, val.encode("utf8"), highlight) +def _force_utf8_py3(val, highlight='', stringify=True): + """Internal helper to encode unicode strings to utf8 version. Actual + changes are marked out with the highlight string if given. + The optional stringify turns ALL values including numbers into string. + """ + # We run into all kind of nasty encoding problems if we mix + if not isinstance(val, basestring): + if stringify: + val = "%s" % val + else: + return val + if not is_unicode(val): + return val + if is_unicode(highlight): + hl_utf = highlight.encode("utf8") + else: + hl_utf = highlight + return (b"%s%s%s" % (hl_utf, val.encode("utf8"), hl_utf)) + + +if PY2: + force_utf8 = _force_utf8_py2 +else: + force_utf8 = _force_utf8_py3 + def force_utf8_rec(input_obj, highlight=''): """Recursive object conversion from unicode to utf8: useful to convert e.g. diff --git a/mig/shared/compat.py b/mig/shared/compat.py index ac5ab0f75..09935f18c 100644 --- a/mig/shared/compat.py +++ b/mig/shared/compat.py @@ -55,6 +55,13 @@ def _is_unicode(val): return (type(val) == _TYPE_UNICODE) +def _unicode_string_to_escaped_unicode(unicode_string): + """Convert utf8 bytes to escaped unicode string.""" + + utf8_bytes = dn_utf8_bytes = codecs.encode(unicode_string, 'utf8') + return codecs.decode(utf8_bytes, 'unicode_escape') + + def ensure_native_string(string_or_bytes): """Given a supplied input which can be either a string or bytes return a representation providing string operations while ensuring that diff --git a/mig/shared/conf.py b/mig/shared/conf.py index 15e36f6fb..4533d0162 100644 --- a/mig/shared/conf.py +++ b/mig/shared/conf.py @@ -32,6 +32,7 @@ import os import sys +from mig.shared._env import MIG_BASE from mig.shared.fileio import unpickle @@ -43,16 +44,14 @@ def get_configuration_object(config_file=None, skip_log=False, """ from mig.shared.configuration import Configuration if config_file: + # use config file path passed explicitly to the function _config_file = config_file elif os.environ.get('MIG_CONF', None): + # use config file explicitly set in the environment _config_file = os.environ['MIG_CONF'] else: - app_dir = os.path.dirname(sys.argv[0]) - if not app_dir: - _config_file = '../server/MiGserver.conf' - else: - _config_file = os.path.join(app_dir, '..', 'server', - 'MiGserver.conf') + # find config file relative to the directory in which the scrip resides + _config_file = os.path.join(MIG_BASE, 'server/MiGserver.conf') configuration = Configuration(_config_file, False, skip_log, disable_auth_log) return configuration @@ -61,7 +60,7 @@ def get_configuration_object(config_file=None, skip_log=False, def get_resource_configuration(resource_home, unique_resource_name, logger): """Load a resource configuration from file""" - + # open the configuration file resource_config_file = resource_home + '/' + unique_resource_name\ diff --git a/mig/shared/configuration.py b/mig/shared/configuration.py index d9ad2a2e8..13f5ec121 100644 --- a/mig/shared/configuration.py +++ b/mig/shared/configuration.py @@ -737,7 +737,7 @@ def reload_config(self, verbose, skip_log=False, disable_auth_log=False, else: print("""Could not find your configuration file (%s). You might need to point the MIG_CONF environment to your actual MiGserver.conf -location.""" % self.config_file) +location.""" % _config_file) raise IOError config = ConfigParser() diff --git a/mig/shared/defaults.py b/mig/shared/defaults.py index 15d8ea1ca..6c38eecb9 100644 --- a/mig/shared/defaults.py +++ b/mig/shared/defaults.py @@ -32,8 +32,7 @@ import os import sys -MIG_BASE = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..')) -MIG_ENV = os.getenv('MIG_ENV', 'default') +from mig.shared._env import MIG_BASE, MIG_ENV # NOTE: python3 switched strings to use unicode by default in contrast to bytes # in python2. File systems remain with utf8 however so we need to diff --git a/mig/shared/pwcrypto.py b/mig/shared/pwcrypto.py index 5d1ee1c33..e8825cd07 100644 --- a/mig/shared/pwcrypto.py +++ b/mig/shared/pwcrypto.py @@ -114,9 +114,17 @@ def best_crypt_salt(configuration): return salt_data -def make_hash(password): +def _random_salt(): + return b64encode(urandom(SALT_LENGTH)) + + +def make_hash(password, _generate_salt=None): """Generate a random salt and return a new hash for the password.""" - salt = b64encode(urandom(SALT_LENGTH)) + + if _generate_salt is None: + _generate_salt = _random_salt + + salt = _generate_salt() derived = b64encode(hashlib.pbkdf2_hmac(HASH_FUNCTION, force_utf8(password), salt, COST_FACTOR, KEY_LENGTH)) diff --git a/mig/shared/safeinput.py b/mig/shared/safeinput.py index 592250755..e91937d8c 100644 --- a/mig/shared/safeinput.py +++ b/mig/shared/safeinput.py @@ -58,7 +58,7 @@ from html import escape as escape_html assert escape_html is not None -from mig.shared.base import force_unicode, force_utf8 +from mig.shared.base import force_unicode, force_native_str from mig.shared.defaults import src_dst_sep, username_charset, \ username_max_length, session_id_charset, session_id_length, \ subject_id_charset, subject_id_min_length, subject_id_max_length, \ @@ -2294,7 +2294,7 @@ def __init__(self, value): def __str__(self): """Return string representation""" - return force_utf8(force_unicode(self.value)) + return force_native_str(self.value) def main(_exit=sys.exit, _print=print): diff --git a/mig/shared/useradm.py b/mig/shared/useradm.py index 9a64964f3..1be0fe3c5 100644 --- a/mig/shared/useradm.py +++ b/mig/shared/useradm.py @@ -30,8 +30,11 @@ from __future__ import print_function from __future__ import absolute_import +from past.builtins import basestring from email.utils import parseaddr +import codecs import datetime +import errno import fnmatch import os import re @@ -44,6 +47,7 @@ from mig.shared.base import client_id_dir, client_dir_id, client_alias, \ get_client_id, extract_field, fill_user, fill_distinguished_name, \ is_gdp_user, mask_creds, sandbox_resource +from mig.shared.compat import PY2, _unicode_string_to_escaped_unicode from mig.shared.conf import get_configuration_object from mig.shared.configuration import Configuration from mig.shared.defaults import user_db_filename, keyword_auto, ssh_conf_dir, \ @@ -82,6 +86,9 @@ force_update_user_map, force_update_resource_map, force_update_vgrid_map, \ VGRIDS, OWNERS, MEMBERS +if not PY2: + raw_input = input + ssh_authkeys = os.path.join(ssh_conf_dir, authkeys_filename) ssh_authpasswords = os.path.join(ssh_conf_dir, authpasswords_filename) ssh_authdigests = os.path.join(ssh_conf_dir, authdigests_filename) @@ -97,6 +104,10 @@ https_authdigests = user_db_filename +_USERADM_CONFIG_DIR_KEYS = ('user_db_home', 'user_home', 'user_settings', + 'user_cache', 'mrsl_files_dir', 'resource_pending') + + def init_user_adm(dynamic_db_path=True): """Shared init function for all user administration scripts. The optional dynamic_db_path argument toggles dynamic user db path lookup @@ -451,6 +462,21 @@ def verify_user_peers(configuration, db_path, client_id, user, now, verify_peer, return accepted_peer_list, effective_expire +def _check_directories_unprovisioned(configuration, db_path): + user_db_home = os.path.dirname(db_path) + return not os.path.exists(db_path) and not os.path.exists(user_db_home) + + +def _provision_directories(configuration): + for config_attr in _USERADM_CONFIG_DIR_KEYS: + try: + dir_to_create = getattr(configuration, config_attr) + os.mkdir(dir_to_create) + except OSError as oserr: + if oserr.errno != errno.ENOENT: # FileNotFoundError + raise + + def create_user_in_db(configuration, db_path, client_id, user, now, authorized, reset_token, reset_auth_type, accepted_peer_list, force, verbose, ask_renew, default_renew, do_lock, @@ -463,8 +489,25 @@ def create_user_in_db(configuration, db_path, client_id, user, now, authorized, flock = None user_db = {} renew = default_renew + + retry_lock = False if do_lock: + try: + flock = lock_user_db(db_path) + except (IOError, OSError) as oserr: + if oserr.errno != errno.ENOENT: # FileNotFoundError + raise + + if _check_directories_unprovisioned(configuration, db_path=db_path): + _provision_directories(configuration) + retry_lock = True + else: + raise Exception("Failed to lock user DB: '%s'" % db_path) + + if retry_lock: flock = lock_user_db(db_path) + if not flock: + raise Exception("Failed to lock user DB: '%s'" % db_path) if not os.path.exists(db_path): # Auto-create missing user DB if either auto_create_db or force is set @@ -859,7 +902,7 @@ def create_user_in_fs(configuration, client_id, user, now, renew, force, verbose # match in htaccess dn_plain = info['distinguished_name'] - dn_enc = dn_plain.encode('string_escape') + dn_enc = _unicode_string_to_escaped_unicode(dn_plain) def upper_repl(match): """Translate hex codes to upper case form""" @@ -1013,7 +1056,7 @@ def upper_repl(match): raise Exception('could not create custom css file: %s' % css_path) -def create_user(user, conf_path, db_path, force=False, verbose=False, +def create_user(user, conf_path, db_path, configuration=None, force=False, verbose=False, ask_renew=True, default_renew=False, do_lock=True, verify_peer=None, peer_expire_slack=0, from_edit_user=False, ask_change_pw=False, auto_create_db=True, create_backup=True): @@ -1021,7 +1064,10 @@ def create_user(user, conf_path, db_path, force=False, verbose=False, format as a first step. """ - if conf_path: + if configuration is not None: + # use it + pass + elif conf_path: if isinstance(conf_path, basestring): # has been checked for accessibility above... diff --git a/tests/support/__init__.py b/tests/support/__init__.py index 422182b4a..b64603d05 100644 --- a/tests/support/__init__.py +++ b/tests/support/__init__.py @@ -41,6 +41,8 @@ from unittest import TestCase, main as testmain from tests.support.configsupp import FakeConfiguration +from tests.support.picklesupp import verify_path_within_output_dir_and_return, \ + is_path_within from tests.support.suppconst import MIG_BASE, TEST_BASE, TEST_FIXTURE_DIR, \ TEST_DATA_DIR, TEST_OUTPUT_DIR, ENVHELP_OUTPUT_DIR @@ -296,16 +298,6 @@ def pretty_display_path(absolute_path): return relative_path -def is_path_within(path, start=None, _msg=None): - """Check if path is within start directory""" - try: - assert os.path.isabs(path), _msg - relative = os.path.relpath(path, start=start) - except: - return False - return not relative.startswith('..') - - def ensure_dirs_exist(absolute_dir): """A simple helper to create absolute_dir and any parents if missing""" try: @@ -399,22 +391,7 @@ def temppath(relative_path, test_case, ensure_dir=False, skip_clean=False): """ assert isinstance(test_case, MigTestCase) - if os.path.isabs(relative_path): - # the only permitted paths are those within the output directory set - # aside for execution of the test suite: this will be enforced below - # so effectively submit the supplied path for scrutiny - tmp_path = relative_path - else: - tmp_path = os.path.join(TEST_OUTPUT_DIR, relative_path) - - # failsafe path checking that supplied paths are rooted within valid paths - is_tmp_path_within_safe_dir = False - for start in (ENVHELP_OUTPUT_DIR): - is_tmp_path_within_safe_dir = is_path_within(tmp_path, start=start) - if is_tmp_path_within_safe_dir: - break - if not is_tmp_path_within_safe_dir: - raise AssertionError("ABORT: corrupt test path=%s" % (tmp_path,)) + tmp_path = verify_path_within_output_dir_and_return(relative_path) if ensure_dir: try: diff --git a/tests/support/_env.py b/tests/support/_env.py index 2c71386a4..f89b330e6 100644 --- a/tests/support/_env.py +++ b/tests/support/_env.py @@ -1,3 +1,33 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# mig.shared._env - environment related consants only +# Copyright (C) 2003-2024 The MiG Project lead by Brian Vinter +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + + +"""Environment consants separated from defaults to reduce import pollution.""" + import os import sys diff --git a/tests/support/_path.py b/tests/support/_path.py new file mode 100644 index 000000000..908cfb493 --- /dev/null +++ b/tests/support/_path.py @@ -0,0 +1,34 @@ +import os + +from tests.support.suppconst import TEST_OUTPUT_DIR, ENVHELP_OUTPUT_DIR + + +def is_path_within(path, start=None, _msg=None): + """Check if path is within start directory""" + try: + assert os.path.isabs(path), _msg + relative = os.path.relpath(path, start=start) + except: + return False + return not relative.startswith('..') + + +def verify_path_within_output_dir_and_return(relative_path): + if os.path.isabs(relative_path): + # the only permitted paths are those within the output directory set + # aside for execution of the test suite: this will be enforced below + # so effectively submit the supplied path for scrutiny + tmp_path = relative_path + else: + tmp_path = os.path.join(TEST_OUTPUT_DIR, relative_path) + + # failsafe path checking that supplied paths are rooted within valid paths + is_tmp_path_within_safe_dir = False + for start in (ENVHELP_OUTPUT_DIR): + is_tmp_path_within_safe_dir = is_path_within(tmp_path, start=start) + if is_tmp_path_within_safe_dir: + break + if not is_tmp_path_within_safe_dir: + raise AssertionError("ABORT: corrupt test path=%s" % (tmp_path,)) + + return tmp_path diff --git a/tests/support/picklesupp.py b/tests/support/picklesupp.py new file mode 100644 index 000000000..5b6a03441 --- /dev/null +++ b/tests/support/picklesupp.py @@ -0,0 +1,12 @@ +import pickle + +from tests.support._path import verify_path_within_output_dir_and_return, \ + is_path_within + + +class PickleAssertMixin: + def assertPickledFile(self, pickle_file): + tmp_path = verify_path_within_output_dir_and_return(pickle_file) + + with open(tmp_path, 'rb') as picklefile: + return pickle.load(picklefile) diff --git a/tests/test_mig_server_createuser.py b/tests/test_mig_server_createuser.py new file mode 100644 index 000000000..573dca8e0 --- /dev/null +++ b/tests/test_mig_server_createuser.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# test_mig_server-createuser - unit tests for the migrid createuser CLI +# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# +# --- END_HEADER --- +# + +"""Unit tests for the migrid createuser CLI""" + +from __future__ import print_function +import errno +import os +import shutil +import sys + +from tests.support import PY2, MIG_BASE, TEST_OUTPUT_DIR, MigTestCase, testmain +from tests.support.picklesupp import PickleAssertMixin + +from mig.server.createuser import _main as createuser +from mig.shared.useradm import _USERADM_CONFIG_DIR_KEYS + + +class TestMigServerCreateuser(MigTestCase, PickleAssertMixin): + def before_each(self): + configuration = self.configuration + test_state_path = configuration.state_path + + for config_key in _USERADM_CONFIG_DIR_KEYS: + dir_path = getattr(configuration, config_key)[0:-1] + try: + shutil.rmtree(dir_path) + except: + pass + + self.expected_user_db_home = configuration.user_db_home[0:-1] + self.expected_user_db_file = os.path.join(self.expected_user_db_home, 'MiG-users.db') + + def _provide_configuration(self): + return 'testconfig' + + def test_user_db_is_created_when_absent(self): + args = [ + "Test User", + "Test Org", + "NA", + "DK", + "user@example.com", + "This is the create comment", + "password" + ] + print("") # acount for output generated by the logic + createuser(self.configuration, args, default_renew=True) + + # presence of user home + path_kind = MigTestCase._absolute_path_kind(self.expected_user_db_home) + self.assertEqual(path_kind, 'dir') + + # presence of user db + expected_user_db_file = os.path.join( + self.expected_user_db_home, 'MiG-users.db') + path_kind = MigTestCase._absolute_path_kind(expected_user_db_file) + self.assertEqual(path_kind, 'file') + + + def test_user_entry_is_recorded(self): + def _generate_salt(): + return b'CCCC12344321CCCC' + + expected_user_id = '/C=DK/ST=NA/L=NA/O=Test Org/OU=NA/CN=Test User/emailAddress=user@example.com' + if PY2: + expected_user_password_hash = 'PBKDF2$sha256$10000$CCCC12344321CCCC$bph8p/avUq42IYeOdJoJuUqrJ7Q32eaT' + else: + expected_user_password_hash = "PBKDF2$sha256$10000$b'CCCC12344321CCCC'$b'bph8p/avUq42IYeOdJoJuUqrJ7Q32eaT'" + + args = [ + "Test User", + "Test Org", + "NA", + "DK", + "user@example.com", + "This is the create comment", + "password" + ] + print("") # acount for output generated by the logic + + createuser(self.configuration, args, default_renew=True, + _generate_salt=_generate_salt) + + pickled = self.assertPickledFile(self.expected_user_db_file) + self.assertIn(expected_user_id, pickled) + actual_user_object = dict(pickled[expected_user_id]) + # TODO: remove resetting the handful of keys here done because changes + # to make them assertion frienfly values will increase the size + # of the diff which, at time of commit, are best minimised. + actual_user_object['created'] = 9999999999.9999999 + actual_user_object['expire'] = 1234567890 + actual_user_object['unique_id'] = '__UNIQUE_ID__' + + self.assertEqual(actual_user_object, { + 'comment': "This is the create comment", + 'country': 'DK', + 'created': 9999999999.9999999, + 'distinguished_name': expected_user_id, + 'email': "user@example.com", + 'expire': 1234567890, + 'full_name': 'Test User', + 'locality': '', + 'openid_names': [], + 'organization': 'Test Org', + 'organizational_unit': '', + "password": "", + "password_hash": expected_user_password_hash, + 'state': 'NA', + 'unique_id': '__UNIQUE_ID__', + }) + + def test_missing_arguments(self): + args = [ + "Test User", + "Test Org", + "NA", + "DK", + "user@example.com", + "This is the create comment", + # leave off a password + ] + print("") # acount for output generated by the logic + + ret = createuser(self.configuration, args) + + self.assertEqual(ret, errno.ENOTSUP) + +if __name__ == '__main__': + testmain() diff --git a/tests/test_mig_shared_base.py b/tests/test_mig_shared_base.py new file mode 100644 index 000000000..82145cb20 --- /dev/null +++ b/tests/test_mig_shared_base.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# test_mig_shared_base - unit test of the corresponding mig shared module +# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# +# --- END_HEADER --- +# + +"""Unit test base functions""" + +import binascii +import codecs +import os +import sys + +from tests.support import PY2, MigTestCase, testmain + +from mig.shared.base import force_utf8 + +DUMMY_STRING = "foo bÆr baz" +DUMMY_UNICODE = u'UniCode123½¾µßðþđŋħĸþł@ª€£$¥©®' + + +class MigSharedBase(MigTestCase): + """Unit tests of fucntions within the mig.shared.base module.""" + + def test_force_utf8_on_string(self): + actual = force_utf8(DUMMY_STRING) + + self.assertIsInstance(actual, bytes) + self.assertEqual(binascii.hexlify(actual), b'666f6f2062c386722062617a') + + def test_force_utf8_on_unicode(self): + actual = force_utf8(DUMMY_UNICODE) + + self.assertIsInstance(actual, bytes) + self.assertEqual(actual, codecs.encode(DUMMY_UNICODE, 'utf8')) + + +if __name__ == '__main__': + testmain()