diff --git a/config.js b/config.js index e122f067a7..d38078ffd8 100644 --- a/config.js +++ b/config.js @@ -1003,6 +1003,8 @@ config.ENDPOINT_SSL_PORT = Number(process.env.ENDPOINT_SSL_PORT) || 6443; // Remove the NSFS condition when NSFS starts to support STS. config.ENDPOINT_SSL_STS_PORT = Number(process.env.ENDPOINT_SSL_STS_PORT) || (process.env.NC_NSFS_NO_DB_ENV === 'true' ? -1 : 7443); config.ENDPOINT_SSL_IAM_PORT = Number(process.env.ENDPOINT_SSL_IAM_PORT) || -1; +// each fork will get port in range [ENDPOINT_FORK_PORT_BASE, ENDPOINT_FORK_PORT_BASE + number of forks - 1)] +config.ENDPOINT_FORK_PORT_BASE = Number(process.env.ENDPOINT_FORK_PORT_BASE) || 6002; config.ALLOW_HTTP = false; config.ALLOW_HTTP_METRICS = true; config.ALLOW_HTTPS_METRICS = true; @@ -1015,6 +1017,8 @@ config.VIRTUAL_HOSTS = process.env.VIRTUAL_HOSTS || ''; config.NC_HEALTH_ENDPOINT_RETRY_COUNT = 3; config.NC_HEALTH_ENDPOINT_RETRY_DELAY = 10; +config.NC_FORK_SERVER_TIMEOUT = 5; // 5 minutes +config.NC_FORK_SERVER_RETRIES = 10; /** @type {'file' | 'executable'} */ @@ -1060,6 +1064,7 @@ config.NC_LIFECYCLE_BUCKET_BATCH_SIZE = 10000; config.NC_LIFECYCLE_GPFS_ILM_ENABLED = true; config.NC_LIFECYCLE_GPFS_ALLOW_SCAN_ON_REMOTE = true; config.NC_GPFS_BIN_DIR = '/usr/lpp/mmfs/bin/'; +config.NC_LIFECYCLE_GPFS_MMAPPLY_ILM_POLICY_CONCURRENCY = 1; ////////// GPFS ////////// config.GPFS_DOWN_DELAY = 1000; diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index b4d612c626..053926a814 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -43,6 +43,7 @@ const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor'); const prom_reporting = require('../server/analytic_services/prometheus_reporting'); const { PersistentLogger } = require('../util/persistent_logger'); const { get_notification_logger } = require('../util/notifications_util'); +const { is_nc_environment } = require('../nc/nc_utils'); const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent; const cluster = /** @type {import('node:cluster').Cluster} */ ( /** @type {unknown} */ (require('node:cluster')) @@ -57,7 +58,8 @@ const SERVICES_TYPES_ENUM = Object.freeze({ S3: 'S3', STS: 'STS', IAM: 'IAM', - METRICS: 'METRICS' + METRICS: 'METRICS', + FORK_HEALTH: 'FORK_HEALTH', }); const new_umask = process.env.NOOBAA_ENDPOINT_UMASK || 0o000; @@ -117,11 +119,11 @@ async function main(options = {}) { const https_metrics_port = options.https_metrics_port || config.EP_METRICS_SERVER_SSL_PORT; /** * Please notice that we can run the main in 2 states: - * 1. Only the primary process runs the main (fork is 0 or undefined) - everything that + * 1. Only the primary process runs the main (fork is 0 or undefined) - everything that * is implemented here would be run by this process. - * 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that - * in only relevant to the primary process it should be implemented in - * fork_utils.start_workers because the primary process returns after start_workers + * 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that + * in only relevant to the primary process it should be implemented in + * fork_utils.start_workers because the primary process returns after start_workers * and the forks will continue executing the code lines in this function * */ const is_workers_started_from_primary = await fork_utils.start_workers(http_metrics_port, https_metrics_port, @@ -202,14 +204,29 @@ async function main(options = {}) { { ...options, https_port: https_port_s3, http_port: http_port_s3, virtual_hosts, bucket_logger, notification_logger }); await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.STS, init_request_sdk, { https_port: https_port_sts, virtual_hosts }); await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.IAM, init_request_sdk, { https_port: https_port_iam }); - + const is_nc = is_nc_environment(); + // fork health server currently runs only on non containerized enviorment + if (is_nc) { + // current process is the primary and only fork. start the fork server directly with the base port + if (cluster.isPrimary) { + await fork_message_request_handler({ + nsfs_config_root: options.nsfs_config_root, + health_port: config.ENDPOINT_FORK_PORT_BASE + }); + // current process is a worker so we listen to get the port from the primary process. + } else { + process.on('message', fork_message_request_handler); + //send a message to the primary process that we are ready to receive messages + process.send({ready_to_start_fork_server: true}); + } + } // START METRICS SERVER if ((http_metrics_port > 0 || https_metrics_port > 0) && cluster.isPrimary) { await prom_reporting.start_server(http_metrics_port, https_metrics_port, false, options.nsfs_config_root); } - // TODO: currently NC NSFS deployments don't have internal_rpc_client nor db, + // TODO: currently NC NSFS deployments don't have internal_rpc_client nor db, // there for namespace monitor won't be registered if (internal_rpc_client && config.NAMESPACE_MONITOR_ENABLED) { endpoint_stats_collector.instance().set_rpc_client(internal_rpc_client); @@ -293,8 +310,6 @@ function create_endpoint_handler(server_type, init_request_sdk, { virtual_hosts, return blob_rest_handler(req, res); } else if (req.url.startsWith('/total_fork_count')) { return fork_count_handler(req, res); - } else if (req.url.startsWith('/endpoint_fork_id')) { - return endpoint_fork_id_handler(req, res); } else if (req.url.startsWith('/_/')) { // internals non S3 requests const api = req.url.slice('/_/'.length); @@ -535,8 +550,38 @@ function unavailable_handler(req, res) { res.end(reply); } +/** + * handler for the inidivdual fork server. used to handle requests the get the worker id + * currently used to check if fork is alive by the health script + * @param {EndpointRequest} req + * @param {import('http').ServerResponse} res + */ +function fork_main_handler(req, res) { + endpoint_utils.set_noobaa_server_header(res); + endpoint_utils.prepare_rest_request(req); + if (req.url.startsWith('/endpoint_fork_id')) { + return endpoint_fork_id_handler(req, res); + } else { + return internal_api_error(req, res, `Unknown API call ${req.url}`); + } +} + +/** + * fork_message_request_handler is used to handle messages from the primary process. + * the primary process sends a message with the designated port to start the fork server. + * @param {Object} msg + */ +async function fork_message_request_handler(msg) { + await http_utils.start_https_server(msg.health_port, + SERVICES_TYPES_ENUM.FORK_HEALTH, + fork_main_handler, + msg.nsfs_config_root + ); +} + exports.main = main; exports.create_endpoint_handler = create_endpoint_handler; exports.create_init_request_sdk = create_init_request_sdk; +exports.endpoint_fork_id_handler = endpoint_fork_id_handler; if (require.main === module) main(); diff --git a/src/manage_nsfs/health.js b/src/manage_nsfs/health.js index fcfb10dfb2..c7f625c69c 100644 --- a/src/manage_nsfs/health.js +++ b/src/manage_nsfs/health.js @@ -101,6 +101,7 @@ process.env.AWS_SDK_JS_SUPPRESS_MAINTENANCE_MODE_MESSAGE = '1'; class NSFSHealth { constructor(options) { this.https_port = options.https_port; + this.fork_base_port = options.fork_base_port; this.all_account_details = options.all_account_details; this.all_bucket_details = options.all_bucket_details; this.all_connection_details = options.all_connection_details; @@ -241,10 +242,10 @@ class NSFSHealth { return service_health; } - async make_endpoint_health_request(url_path) { + async make_endpoint_health_request(url_path, port = this.https_port) { const response = await make_https_request({ - HOSTNAME, - port: this.https_port, + hostname: HOSTNAME, + port, path: url_path, method: 'GET', rejectUnauthorized: false, @@ -260,43 +261,37 @@ class NSFSHealth { let url_path = '/total_fork_count'; const worker_ids = []; let total_fork_count = 0; + let fork_count_response; let response; try { - const fork_count_response = await this.make_endpoint_health_request(url_path); - if (!fork_count_response) { - return { - response: fork_response_code.NOT_RUNNING, - total_fork_count: total_fork_count, - running_workers: worker_ids, - }; - } - total_fork_count = fork_count_response.fork_count; - if (total_fork_count > 0) { - url_path = '/endpoint_fork_id'; - await P.retry({ - attempts: total_fork_count * 2, - delay_ms: 1, - func: async () => { - const fork_id_response = await this.make_endpoint_health_request(url_path); - if (fork_id_response.worker_id && !worker_ids.includes(fork_id_response.worker_id)) { - worker_ids.push(fork_id_response.worker_id); - } - if (worker_ids.length < total_fork_count) { - throw new Error('Number of running forks is less than the expected fork count.'); - } - } - }); - if (worker_ids.length === total_fork_count) { - response = fork_response_code.RUNNING; - } else { - response = fork_response_code.MISSING_FORKS; - } - } else { - response = fork_response_code.RUNNING; - } + fork_count_response = await this.make_endpoint_health_request(url_path); } catch (err) { - dbg.log1('Error while pinging endpoint host :' + HOSTNAME + ', port ' + this.https_port, err); - response = fork_response_code.NOT_RUNNING; + dbg.log0('Error while pinging endpoint host :' + HOSTNAME, err); + } + if (!fork_count_response) { + return { + response: fork_response_code.NOT_RUNNING, + total_fork_count: total_fork_count, + running_workers: worker_ids, + }; + } + + total_fork_count = fork_count_response.fork_count; + url_path = '/endpoint_fork_id'; + for (let i = 0; i < total_fork_count; i++) { + const port = this.fork_base_port + i; + try { + const fork_id_response = await this.make_endpoint_health_request(url_path, port); + worker_ids.push(fork_id_response.worker_id); + } catch (err) { + dbg.log0('Error while pinging fork :' + HOSTNAME + ', port ' + port, err); + } + } + if (worker_ids.length < total_fork_count) { + dbg.log0('Number of running forks is less than the expected fork count.'); + response = fork_response_code.MISSING_FORKS; + } else { + response = fork_response_code.RUNNING; } return { response: response, @@ -637,6 +632,7 @@ class NSFSHealth { async function get_health_status(argv, config_fs) { try { const https_port = Number(argv.https_port) || config.ENDPOINT_SSL_PORT; + const fork_base_port = Number(argv.fork_base_port) || config.ENDPOINT_FORK_PORT_BASE; const deployment_type = argv.deployment_type || 'nc'; const all_account_details = get_boolean_or_string_value(argv.all_account_details); const all_bucket_details = get_boolean_or_string_value(argv.all_bucket_details); @@ -645,8 +641,9 @@ async function get_health_status(argv, config_fs) { const lifecycle = get_boolean_or_string_value(argv.lifecycle); if (deployment_type === 'nc') { - const health = new NSFSHealth({ https_port, - all_account_details, all_bucket_details, all_connection_details, notif_storage_threshold, lifecycle, config_fs }); + const health = new NSFSHealth({ https_port, fork_base_port, + all_account_details, all_bucket_details, all_connection_details, + notif_storage_threshold, lifecycle, config_fs }); const health_status = await health.nc_nsfs_health(); write_stdout_response(ManageCLIResponse.HealthStatus, health_status); } else { diff --git a/src/manage_nsfs/nc_lifecycle.js b/src/manage_nsfs/nc_lifecycle.js index 0eb65dbad4..03d9e77b1e 100644 --- a/src/manage_nsfs/nc_lifecycle.js +++ b/src/manage_nsfs/nc_lifecycle.js @@ -40,6 +40,7 @@ const TIMED_OPS = Object.freeze({ RUN_LIFECYLE: 'run_lifecycle', LIST_BUCKETS: 'list_buckets', CREATE_GPFS_CANDIDATES_FILES: 'create_gpfs_candidates_files', + CREATE_GPFS_CANDIDATE_FILE_BY_ILM_POLICY: 'create_candidates_file_by_gpfs_ilm_policy', PROCESS_BUCKETS: 'process_buckets', PROCESS_BUCKET: 'process_bucket', PROCESS_RULE: 'process_rule', @@ -82,12 +83,16 @@ class NCLifecycle { lifecycle_run_times: {}, total_stats: this._get_default_stats(), state: { is_finished: false }, - buckets_statuses: {} + buckets_statuses: {}, + mount_points_statuses: {} }; this.return_short_status = options.short_status || false; this.disable_service_validation = options.disable_service_validation || false; this.disable_runtime_validation = options.disable_runtime_validation || false; this.should_continue_last_run = options.should_continue_last_run || false; + // used for GPFS optimization - maps bucket names to their mount points + // example - { 'bucket1': '/gpfs/mount/point1', 'bucket2': '/gpfs/mount/point2' } + this.bucket_to_mount_point_map = {}; } /** @@ -181,13 +186,19 @@ class NCLifecycle { } while (!this.lifecycle_run_status.state.is_finished) { - await P.map_with_concurrency(buckets_concurrency, bucket_names, async bucket_name => - await this._call_op_and_update_status({ - bucket_name, - op_name: TIMED_OPS.PROCESS_BUCKET, - op_func: async () => this.process_bucket(bucket_name, system_json) - }) - ); + await P.map_with_concurrency(buckets_concurrency, bucket_names, async bucket_name => { + try { + await this._call_op_and_update_status({ + bucket_name, + op_name: TIMED_OPS.PROCESS_BUCKET, + op_func: async () => this.process_bucket(bucket_name, system_json) + }); + } catch (err) { + dbg.error('process_bucket failed with error', err, err.code, err.message); + if (!this.lifecycle_run_status.buckets_statuses[bucket_name]) this.init_bucket_status(bucket_name); + this.lifecycle_run_status.buckets_statuses[bucket_name].state.is_finished = true; + } + }); this.lifecycle_run_status.state.is_finished = Object.values(this.lifecycle_run_status.buckets_statuses).reduce( (acc, bucket) => acc && (bucket.state?.is_finished), true @@ -212,7 +223,7 @@ class NCLifecycle { await object_sdk._simple_load_requesting_account(); const should_notify = notifications_util.should_notify_on_event(bucket_json, notifications_util.OP_TO_EVENT.lifecycle_delete.name); if (!bucket_json.lifecycle_configuration_rules) { - this.lifecycle_run_status.buckets_statuses[bucket_json.name].state = { is_finished: true }; + this.lifecycle_run_status.buckets_statuses[bucket_name].state = { is_finished: true }; return; } await this.process_rules(bucket_json, object_sdk, should_notify); @@ -224,14 +235,20 @@ class NCLifecycle { * @param {nb.ObjectSDK} object_sdk */ async process_rules(bucket_json, object_sdk, should_notify) { - try { - const bucket_state = this.lifecycle_run_status.buckets_statuses[bucket_json.name].state; - bucket_state.num_processed_objects = 0; - while (!bucket_state.is_finished && bucket_state.num_processed_objects < config.NC_LIFECYCLE_BUCKET_BATCH_SIZE) { - await P.all(_.map(bucket_json.lifecycle_configuration_rules, + const bucket_name = bucket_json.name; + const bucket_state = this.lifecycle_run_status.buckets_statuses[bucket_name].state; + bucket_state.num_processed_objects = 0; + while (!bucket_state.is_finished && bucket_state.num_processed_objects < config.NC_LIFECYCLE_BUCKET_BATCH_SIZE) { + if (this._should_use_gpfs_optimization()) { + const bucket_mount_point = this.bucket_to_mount_point_map[bucket_name]; + if (this.lifecycle_run_status.mount_points_statuses[bucket_mount_point]?.errors?.length > 0) { + throw new Error(`Lifecycle run failed for bucket ${bucket_name} on mount point ${bucket_mount_point} due to errors: ${this.lifecycle_run_status.mount_points_statuses[bucket_mount_point].errors}`); + } + } + await P.all(_.map(bucket_json.lifecycle_configuration_rules, async (lifecycle_rule, index) => await this._call_op_and_update_status({ - bucket_name: bucket_json.name, + bucket_name, rule_id: lifecycle_rule.id, op_name: TIMED_OPS.PROCESS_RULE, op_func: async () => this.process_rule( @@ -243,15 +260,12 @@ class NCLifecycle { ) }) ) - ); - bucket_state.is_finished = Object.values(this.lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses) - .reduce( - (acc, rule) => acc && (_.isEmpty(rule.state) || rule.state.is_finished), - true - ); - } - } catch (err) { - dbg.error('process_rules failed with error', err, err.code, err.message); + ); + bucket_state.is_finished = Object.values(this.lifecycle_run_status.buckets_statuses[bucket_name].rules_statuses) + .reduce( + (acc, rule) => acc && (_.isEmpty(rule.state) || rule.state.is_finished), + true + ); } } @@ -787,14 +801,15 @@ class NCLifecycle { * @param {{ * op_name: string; * op_func: () => Promise; + * mount_point?: string, * bucket_name?: string, * rule_id?: string * }} params * @returns {Promise} */ - async _call_op_and_update_status({ bucket_name = undefined, rule_id = undefined, op_name, op_func }) { + async _call_op_and_update_status({ mount_point = undefined, bucket_name = undefined, rule_id = undefined, op_name, op_func }) { const start_time = Date.now(); - const update_options = { op_name, bucket_name, rule_id }; + const update_options = { mount_point, op_name, bucket_name, rule_id }; let end_time; let took_ms; let error; @@ -815,13 +830,15 @@ class NCLifecycle { } /** - * update_status updates rule/bucket/global based on the given parameters + * update_status updates rule/bucket/mount_point/global based on the given parameters * 1. initalize statuses/times/stats per level * 2. update times * 3. update errors * 4. update stats if the op is at rule level + * Note - on mount_point we won't update stats/state * @param {{ * op_name: string, + * mount_point?: string, * bucket_name?: string, * rule_id?: string, * op_times: { start_time?: number, end_time?: number, took_ms?: number }, @@ -830,12 +847,14 @@ class NCLifecycle { * } params * @returns {Void} */ - update_status({ bucket_name, rule_id, op_name, op_times, reply = [], error = undefined }) { + update_status({ mount_point, bucket_name, rule_id, op_name, op_times, reply = [], error = undefined }) { if (op_times.start_time) { if (op_name === TIMED_OPS.PROCESS_RULE) { this.init_rule_status(bucket_name, rule_id); } else if (op_name === TIMED_OPS.PROCESS_BUCKET) { this.init_bucket_status(bucket_name); + } else if (op_name === TIMED_OPS.CREATE_GPFS_CANDIDATE_FILE_BY_ILM_POLICY) { + this.init_mount_status(mount_point); } } if (op_times.end_time) { @@ -843,8 +862,8 @@ class NCLifecycle { this.update_rule_status_is_finished(bucket_name, rule_id); } } - this._update_times_on_status({ op_name, op_times, bucket_name, rule_id }); - this._update_error_on_status({ error, bucket_name, rule_id }); + this._update_times_on_status({ op_name, op_times, mount_point, bucket_name, rule_id }); + this._update_error_on_status({ error, mount_point, bucket_name, rule_id }); if (bucket_name && rule_id) { this.update_stats_on_status({ bucket_name, rule_id, op_name, op_times, reply }); } @@ -918,16 +937,18 @@ class NCLifecycle { /** * _update_times_on_status updates start/end & took times in lifecycle status * @param {{op_name: String, op_times: {start_time?: number, end_time?: number, took_ms?: number }, - * bucket_name?: String, rule_id?: String}} params + * mount_point?: String, bucket_name?: String, rule_id?: String}} params * @returns */ - _update_times_on_status({ op_name, op_times, bucket_name = undefined, rule_id = undefined }) { + _update_times_on_status({ op_name, op_times, mount_point = undefined, bucket_name = undefined, rule_id = undefined }) { for (const [key, value] of Object.entries(op_times)) { const status_key = op_name + '_' + key; if (bucket_name && rule_id) { this.lifecycle_run_status.buckets_statuses[bucket_name].rules_statuses[rule_id].rule_process_times[status_key] = value; } else if (bucket_name) { this.lifecycle_run_status.buckets_statuses[bucket_name].bucket_process_times[status_key] = value; + } else if (mount_point) { + this.lifecycle_run_status.mount_points_statuses[mount_point].mount_point_process_times[status_key] = value; } else { this.lifecycle_run_status.lifecycle_run_times[status_key] = value; } @@ -936,15 +957,17 @@ class NCLifecycle { /** * _update_error_on_status updates an error occured in lifecycle status - * @param {{error: Error, bucket_name?: string, rule_id?: string}} params + * @param {{error: Error, mount_point?: string, bucket_name?: string, rule_id?: string}} params * @returns */ - _update_error_on_status({ error, bucket_name = undefined, rule_id = undefined }) { + _update_error_on_status({ error, mount_point = undefined, bucket_name = undefined, rule_id = undefined }) { if (!error) return; if (bucket_name && rule_id) { (this.lifecycle_run_status.buckets_statuses[bucket_name].rules_statuses[rule_id].errors ??= []).push(error.message); } else if (bucket_name) { (this.lifecycle_run_status.buckets_statuses[bucket_name].errors ??= []).push(error.message); + } else if (mount_point) { + (this.lifecycle_run_status.mount_points_statuses[mount_point].errors ??= []).push(error.message); } else { (this.lifecycle_run_status.errors ??= []).push(error.message); } @@ -985,6 +1008,18 @@ class NCLifecycle { return this.lifecycle_run_status.buckets_statuses[bucket_name]; } + /** + * init the mount_point status object statuses if they don't exist + * @param {string} mount_point + * @returns {Object} created or existing mount_point status + */ + init_mount_status(mount_point) { + this.lifecycle_run_status.mount_points_statuses[mount_point] ??= {}; + this.lifecycle_run_status.mount_points_statuses[mount_point].mount_point_process_times ??= {}; + return this.lifecycle_run_status.mount_points_statuses[mount_point]; + } + + /** * init the rule status object statuses if they don't exist * @param {string} bucket_name @@ -1199,7 +1234,9 @@ class NCLifecycle { for (const bucket_name of bucket_names) { const bucket_json = await this.config_fs.get_bucket_by_name(bucket_name, config_fs_options); const bucket_mount_point = this.find_mount_point_by_bucket_path(mount_point_to_policy_map, bucket_json.path); + this.bucket_to_mount_point_map[bucket_name] = bucket_mount_point; if (!bucket_json.lifecycle_configuration_rules?.length) continue; + for (const lifecycle_rule of bucket_json.lifecycle_configuration_rules) { // currently we support expiration (current version) only if (lifecycle_rule.expiration) { @@ -1213,11 +1250,22 @@ class NCLifecycle { await native_fs_utils._create_path(ILM_POLICIES_TMP_DIR, this.non_gpfs_fs_context, config.BASE_MODE_CONFIG_DIR); await native_fs_utils._create_path(ILM_CANDIDATES_TMP_DIR, this.non_gpfs_fs_context, config.BASE_MODE_CONFIG_DIR); - for (const [mount_point, policy] of Object.entries(mount_point_to_policy_map)) { - if (policy === '') continue; - const ilm_policy_path = await this.write_tmp_ilm_policy(mount_point, policy); - await this.create_candidates_file_by_gpfs_ilm_policy(mount_point, ilm_policy_path); - } + await P.map_with_concurrency(config.NC_LIFECYCLE_GPFS_MMAPPLY_ILM_POLICY_CONCURRENCY, + Object.entries(mount_point_to_policy_map), async ([mount_point, policy]) => { + try { + if (policy === '') return; + await this._call_op_and_update_status({ + mount_point, + op_name: TIMED_OPS.CREATE_GPFS_CANDIDATE_FILE_BY_ILM_POLICY, + op_func: async () => { + const ilm_policy_path = await this.write_tmp_ilm_policy(mount_point, policy); + await this.create_candidates_file_by_gpfs_ilm_policy(mount_point, ilm_policy_path); + } + }); + } catch (err) { + dbg.error('create_candidates_file_by_gpfs_ilm_policy failed with error', err, err.code, err.message); + } + }); } /** @@ -1233,7 +1281,7 @@ class NCLifecycle { const bucket_path = bucket_json.path; const bucket_rule_id = this.get_lifecycle_ilm_candidate_file_suffix(bucket_json.name, lifecycle_rule); const in_bucket_path = path.join(bucket_path, '/%'); - const in_bucket_internal_dir = path.join(bucket_path, '/.noobaa_nsfs%/%'); + const in_bucket_internal_dir = path.join(bucket_path, `/${config.NSFS_TEMP_DIR_NAME}%/%`); const in_versions_dir = path.join(bucket_path, '/.versions/%'); const in_nested_versions_dir = path.join(bucket_path, '/%/.versions/%'); const ilm_policy_helpers = { bucket_rule_id, in_bucket_path, in_bucket_internal_dir, in_versions_dir, in_nested_versions_dir }; diff --git a/src/nc/nc_utils.js b/src/nc/nc_utils.js index 74f6ac40fa..6fdf764ade 100644 --- a/src/nc/nc_utils.js +++ b/src/nc/nc_utils.js @@ -24,7 +24,15 @@ function check_root_account_owns_user(root_account, account) { return root_account._id === account.owner; } +/** + * @returns {boolean} true if the current environment is a NooBaa non containerized environment + */ +function is_nc_environment() { + return process.env.NC_NSFS_NO_DB_ENV && process.env.NC_NSFS_NO_DB_ENV === 'true'; +} + // EXPORTS exports.generate_id = generate_id; exports.check_root_account_owns_user = check_root_account_owns_user; +exports.is_nc_environment = is_nc_environment; diff --git a/src/test/system_tests/test_utils.js b/src/test/system_tests/test_utils.js index 0ccbe6c730..191b0678b7 100644 --- a/src/test/system_tests/test_utils.js +++ b/src/test/system_tests/test_utils.js @@ -16,6 +16,7 @@ const nb_native = require('../../util/nb_native'); const { CONFIG_TYPES } = require('../../sdk/config_fs'); const native_fs_utils = require('../../util/native_fs_utils'); const { NodeHttpHandler } = require("@smithy/node-http-handler"); +const sinon = require('sinon'); const GPFS_ROOT_PATH = process.env.GPFS_ROOT_PATH; const IS_GPFS = !_.isUndefined(GPFS_ROOT_PATH); @@ -44,12 +45,12 @@ function get_tmp_path() { const is_nc_coretest = process.env.NC_CORETEST === 'true'; /** - * - * @param {*} need_to_exist - * @param {*} pool_id - * @param {*} bucket_name - * @param {*} blocks - * @param {AWS.S3} s3 + * + * @param {*} need_to_exist + * @param {*} pool_id + * @param {*} bucket_name + * @param {*} blocks + * @param {AWS.S3} s3 */ function blocks_exist_on_cloud(need_to_exist, pool_id, bucket_name, blocks, s3) { console.log('blocks_exist_on_cloud::', need_to_exist, pool_id, bucket_name); @@ -203,13 +204,13 @@ async function disable_accounts_s3_access(rpc_client, accounts_emails) { /** * generate_s3_policy generates S3 buket policy for the given principal - * + * * @param {string} principal - The principal to grant access to. * @param {string} bucket - The bucket to grant access to. * @param {Array} action - The action to grant access to. - * @returns {{ + * @returns {{ * policy: Record, - * params: { bucket: string, action: Array, principal: string } + * params: { bucket: string, action: Array, principal: string } * }} */ function generate_s3_policy(principal, bucket, action) { @@ -246,7 +247,7 @@ function invalid_nsfs_root_permissions() { /** * get_coretest_path returns coretest path according to process.env.NC_CORETEST value - * @returns {string} + * @returns {string} */ function get_coretest_path() { return process.env.NC_CORETEST ? './nc_coretest' : './coretest'; @@ -353,7 +354,7 @@ async function set_path_permissions_and_owner(p, owner_options, permissions = 0o await fs.promises.chmod(p, permissions); } -/** +/** * set_nc_config_dir_in_config sets given config_root to be config.NSFS_NC_CONF_DIR * @param {string} config_root */ @@ -434,12 +435,12 @@ function generate_iam_client(access_key, secret_key, endpoint) { /** * generate_nsfs_account generate an nsfs account and returns its credentials - * if the admin flag is received (in the options object) the function will not create + * if the admin flag is received (in the options object) the function will not create * the account (it was already created in the system) and only return the credentials. - * @param {nb.APIClient} rpc_client - * @param {String} EMAIL - * @param {String} default_new_buckets_path - * @param {Object} options + * @param {nb.APIClient} rpc_client + * @param {String} EMAIL + * @param {String} default_new_buckets_path + * @param {Object} options * @returns {Promise} */ async function generate_nsfs_account(rpc_client, EMAIL, default_new_buckets_path, options = {}) { @@ -480,11 +481,11 @@ async function generate_nsfs_account(rpc_client, EMAIL, default_new_buckets_path * get_new_buckets_path_by_test_env returns new_buckets_path value * on NC - new_buckets_path is full absolute path * on Containerized - new_buckets_path is the directory - * Example - + * Example - * On NC - /private/tmp/new_buckets_path/ * On Continerized - new_buckets_path/ - * @param {string} new_buckets_full_path - * @param {string} new_buckets_dir + * @param {string} new_buckets_full_path + * @param {string} new_buckets_dir * @returns {string} */ function get_new_buckets_path_by_test_env(new_buckets_full_path, new_buckets_dir) { @@ -494,16 +495,16 @@ function get_new_buckets_path_by_test_env(new_buckets_full_path, new_buckets_dir /** * write_manual_config_file writes config file directly to the file system without using config FS * used for creating backward compatibility tests, invalid config files etc - * 1. if it's account - + * 1. if it's account - * 1.1. create identity directory /{config_dir_path}/identities/{id}/ - * 2. create the config file - + * 2. create the config file - * 2.1. if it's a bucket - create it in /{config_dir_path}/buckets/{bucket_name}.json * 2.2. if it's an account - create it in /{config_dir_path}/identities/{id}/identity.json * 3. if it's an account and symlink_name is true - create /{config_dir_path}/accounts_by_name/{account_name}.symlink -> /{config_dir_path}/identities/{id}/identity.json * 4. if it's an account and symlink_access_key is true and there is access key in the account config - create /{config_dir_path}/access_keys/{access_key}.symlink -> /{config_dir_path}/identities/{id}/identity.json - * @param {String} type + * @param {String} type * @param {import('../../sdk/config_fs').ConfigFS} config_fs - * @param {Object} config_data + * @param {Object} config_data * @param {String} [invalid_str] * @param {{symlink_name?: Boolean, symlink_access_key?: Boolean}} [options] * @returns {Promise} @@ -539,8 +540,8 @@ async function write_manual_config_file(type, config_fs, config_data, invalid_st * symlink_account_name symlinks the account's name path to the target link path * used for manual creation of the account name symlink * @param {import('../../sdk/config_fs').ConfigFS} config_fs - * @param {String} account_name - * @param {String} link_target + * @param {String} account_name + * @param {String} link_target */ async function symlink_account_name(config_fs, account_name, link_target) { const name_symlink_path = config_fs.get_account_or_user_path_by_name(account_name); @@ -552,7 +553,7 @@ async function symlink_account_name(config_fs, account_name, link_target) { * used for manual creation of the account access key symlink * @param {import('../../sdk/config_fs').ConfigFS} config_fs * @param {Object} access_keys - * @param {String} link_target + * @param {String} link_target */ async function symlink_account_access_keys(config_fs, access_keys, link_target) { for (const { access_key } of access_keys) { @@ -564,7 +565,7 @@ async function symlink_account_access_keys(config_fs, access_keys, link_target) /** * create_identity_dir_if_missing created the identity directory if missing * @param {import('../../sdk/config_fs').ConfigFS} config_fs - * @param {String} _id + * @param {String} _id * @returns {Promise} */ async function create_identity_dir_if_missing(config_fs, _id) { @@ -581,7 +582,7 @@ async function create_identity_dir_if_missing(config_fs, _id) { * 1. create old json file in /config_dir_path/accounts/account.json * 2. if symlink_access_key is true - create old access key symlink /config_dir_path/access_keys/{access_key}.symlink -> /config_dir_path/accounts/account.json * @param {import('../../sdk/config_fs').ConfigFS} config_fs - * @param {Object} config_data + * @param {Object} config_data * @param {{symlink_access_key?: Boolean}} [options] * @returns {Promise} */ @@ -606,9 +607,9 @@ async function write_manual_old_account_config_file(config_fs, config_data, { sy /** * delete_manual_config_file deletes config file directly from the file system without using config FS * used for deleting invalid config files etc - * @param {String} type + * @param {String} type * @param {import('../../sdk/config_fs').ConfigFS} config_fs - * @param {Object} config_data + * @param {Object} config_data * @returns {Promise} */ async function delete_manual_config_file(type, config_fs, config_data) { @@ -649,7 +650,7 @@ async function fail_test_if_default_config_dir_exists(test_name, config_fs) { /** * create_config_dir will create the config directory on the file system - * @param {String} config_dir + * @param {String} config_dir * @returns {Promise} */ async function create_config_dir(config_dir) { @@ -688,10 +689,10 @@ async function clean_config_dir(config_fs, custom_config_dir_path) { /** * create_file creates a file in the file system - * @param {nb.NativeFSContext} fs_context - * @param {String} file_path - * @param {Object} file_data - * @param {{stringify_json?: Boolean}} [options={}] + * @param {nb.NativeFSContext} fs_context + * @param {String} file_path + * @param {Object} file_data + * @param {{stringify_json?: Boolean}} [options={}] */ async function create_file(fs_context, file_path, file_data, options = {}) { const buf = Buffer.from(options?.stringify_json ? JSON.stringify(file_data) : file_data); @@ -709,7 +710,7 @@ async function create_file(fs_context, file_path, file_data, options = {}) { * create_system_json creates the system.json file * if mock_config_dir_version it sets it before creating the file * @param {import('../../sdk/config_fs').ConfigFS} config_fs - * @param {String} [mock_config_dir_version] + * @param {String} [mock_config_dir_version] * @returns {Promise} */ async function create_system_json(config_fs, mock_config_dir_version) { @@ -722,7 +723,7 @@ async function create_system_json(config_fs, mock_config_dir_version) { * update_system_json updates the system.json file * if mock_config_dir_version it sets it before creating the file * @param {import('../../sdk/config_fs').ConfigFS} config_fs - * @param {String} [mock_config_dir_version] + * @param {String} [mock_config_dir_version] * @returns {Promise} */ async function update_system_json(config_fs, mock_config_dir_version) { @@ -734,8 +735,8 @@ async function update_system_json(config_fs, mock_config_dir_version) { /** * run_or_skip_test checks - * 1. if cond condition evaluated to true - run test - * 2. else - skip test - * @param {*} cond + * 2. else - skip test + * @param {*} cond * @returns {*} */ const run_or_skip_test = cond => { @@ -744,6 +745,103 @@ const run_or_skip_test = cond => { } else return it.skip; }; +/** + * update_file_mtime updates the mtime of the target path + * Warnings: + * - This operation would change the mtime of the file to 5 days ago - which means that it changes the etag / obj_id of the object + * - Please do not use on versioned objects (version_id will not be changed, but the mtime will be changed) - might cause issues. + * @param {String} target_path + * @returns {Promise} + */ +async function update_file_mtime(target_path) { + const update_file_mtime_cmp = os_utils.IS_MAC ? `touch -t $(date -v -5d +"%Y%m%d%H%M.%S") ${target_path}` : `touch -d "5 days ago" ${target_path}`; + await os_utils.exec(update_file_mtime_cmp, { return_stdout: true }); +} + +///////////////////////////////// +////// LIFECYCLE UTILS /////// +///////////////////////////////// + +/** + * generate_lifecycle_rule generate an S3 lifecycle rule with optional filters and expiration currently (can be extend to support more lifecycle rule params) + * + * @param {number} expiration_days + * @param {string} id + * @param {string} [prefix] + * @param {Array} [tags] + * @param {number} [size_gt] + * @param {number} [size_lt] + * @returns {Object} + */ +function generate_lifecycle_rule(expiration_days, id, prefix, tags, size_gt, size_lt) { + const filters = {}; + if (prefix) filters.Prefix = prefix; + if (Array.isArray(tags) && tags.length) filters.Tags = tags; + if (size_gt !== undefined) filters.ObjectSizeGreaterThan = size_gt; + if (size_lt !== undefined) filters.ObjectSizeLessThan = size_lt; + + const filter = Object.keys(filters).length > 1 ? { And: filters } : filters; + + return { + ID: id, + Status: 'Enabled', + Filter: filter, + Expiration: { Days: expiration_days }, + }; +} + +/** + * validate_expiration_header validates the `x-amz-expiration` header against the object creation time, expected rule ID and expiration days + * + * The header is expected to have the format: + * expiry-date="YYYY-MM-DDTHH:MM:SS.SSSZ", rule-id="RULE_ID" + * + * @param {string} expiration_header - expiration header value + * @param {string|Date} start_time - start/create time (string or Date) of the object + * @param {string} expected_rule_id - expected rule ID to match in the header + * @param {number} delta_days - expected number of days between start_time and expiry-date + * @returns {boolean} true if the header is valid and matches the expected_rule_id and delta_days otherwise false + */ +function validate_expiration_header(expiration_header, start_time, expected_rule_id, delta_days) { + const match = expiration_header.match(/expiry-date="(.+)", rule-id="(.+)"/); + if (!match) return false; + + const [, expiry_str, rule_id] = match; + const expiration = new Date(expiry_str); + const start = new Date(start_time); + start.setUTCHours(0, 0, 0, 0); // adjusting to midnight UTC otherwise the tests will fail - fix for ceph-s3 tests + + const days_diff = Math.floor((expiration.getTime() - start.getTime()) / (24 * 60 * 60 * 1000)); + + return days_diff === delta_days && rule_id === expected_rule_id; +} + +/** + * set_mock_functions sets mock functions used by the health script + * the second param is an object having the name of the mock functions as the keys and + * the value is an array of responses by the order of their call + * @param {Object} Health + * @param {{get_endpoint_response?: Object[], get_service_state?: Object[], + * get_system_config_file?: Object[], get_service_memory_usage?: Object[], + * get_lifecycle_health_status?: Object, get_latest_lifecycle_run_status?: Object}} mock_function_responses + */ +function set_health_mock_functions(Health, mock_function_responses) { + for (const mock_function_name of Object.keys(mock_function_responses)) { + const mock_function_responses_arr = mock_function_responses[mock_function_name]; + const obj_to_stub = mock_function_name === 'get_system_config_file' ? Health.config_fs : Health; + + if (obj_to_stub[mock_function_name]?.restore) obj_to_stub[mock_function_name]?.restore(); + const stub = sinon.stub(obj_to_stub, mock_function_name); + for (let i = 0; i < mock_function_responses_arr.length; i++) { + stub.onCall(i).returns(Promise.resolve(mock_function_responses_arr[i])); + } + } +} + + +exports.update_file_mtime = update_file_mtime; +exports.generate_lifecycle_rule = generate_lifecycle_rule; +exports.validate_expiration_header = validate_expiration_header; exports.run_or_skip_test = run_or_skip_test; exports.blocks_exist_on_cloud = blocks_exist_on_cloud; exports.create_hosts_pool = create_hosts_pool; @@ -781,3 +879,4 @@ exports.update_system_json = update_system_json; exports.fail_test_if_default_config_dir_exists = fail_test_if_default_config_dir_exists; exports.create_config_dir = create_config_dir; exports.clean_config_dir = clean_config_dir; +exports.set_health_mock_functions = set_health_mock_functions; diff --git a/src/test/unit_tests/jest_tests/test_nc_lifecycle_gpfs_ilm_integration.test.js b/src/test/unit_tests/jest_tests/test_nc_lifecycle_gpfs_ilm_integration.test.js index 4ca71060f7..c2b30f12a9 100644 --- a/src/test/unit_tests/jest_tests/test_nc_lifecycle_gpfs_ilm_integration.test.js +++ b/src/test/unit_tests/jest_tests/test_nc_lifecycle_gpfs_ilm_integration.test.js @@ -6,6 +6,7 @@ process.env.DISABLE_INIT_RANDOM_SEED = 'true'; const fs = require('fs'); const path = require('path'); +const config = require('../../../../config'); const { ConfigFS } = require('../../../sdk/config_fs'); const { file_delete, create_fresh_path } = require('../../../util/fs_utils'); const { read_file } = require('../../../util/native_fs_utils'); @@ -479,7 +480,7 @@ function get_mock_base_ilm_policy(bucket_storage_path, rule_id, lifecycle_run_st const policy_rule_id = `${bucket_name}_${rule_id}_${lifecycle_run_status.lifecycle_run_times.run_lifecycle_start_time}`; const policy_base = `RULE '${policy_rule_id}' LIST '${policy_rule_id}'\n` + `WHERE PATH_NAME LIKE '${bucket_storage_path}/%'\n` + - `AND PATH_NAME NOT LIKE '${bucket_storage_path}/.noobaa_nsfs%/%'\n`; + `AND PATH_NAME NOT LIKE '${bucket_storage_path}/${config.NSFS_TEMP_DIR_NAME}%/%'\n`; return definitions_base + policy_base; } diff --git a/src/test/unit_tests/nc_coretest.js b/src/test/unit_tests/nc_coretest.js index 71e65059c2..15b9dadbb8 100644 --- a/src/test/unit_tests/nc_coretest.js +++ b/src/test/unit_tests/nc_coretest.js @@ -11,6 +11,7 @@ const SensitiveString = require('../../util/sensitive_string'); const { exec_manage_cli, TMP_PATH, create_redirect_file, delete_redirect_file } = require('../system_tests/test_utils'); const { TYPES, ACTIONS } = require('../../manage_nsfs/manage_nsfs_constants'); const { ConfigFS } = require('../../sdk/config_fs'); +const os_utils = require('../../util/os_utils'); // keep me first - this is setting envs that should be set before other modules are required. const NC_CORETEST = 'nc_coretest'; @@ -164,6 +165,11 @@ async function stop_nsfs_process() { if (nsfs_process) nsfs_process.kill('SIGKILL'); } +async function get_nsfs_fork_pids() { + const res = await os_utils.exec(`ps -o pid --ppid ${nsfs_process.pid}`, { return_stdout: true, trim_stdout: true }); + return res.split('\n').slice(1).map(pid => parseInt(pid, 10)); +} + /** * get_current_setup_options returns the current_setup_options * currently the setup_options we use are for the nsfs process: fork and debug @@ -538,6 +544,7 @@ exports.setup = setup; exports.get_current_setup_options = get_current_setup_options; exports.stop_nsfs_process = stop_nsfs_process; exports.start_nsfs_process = start_nsfs_process; +exports.get_nsfs_fork_pids = get_nsfs_fork_pids; exports.no_setup = _.noop; exports.log = log; exports.SYSTEM = SYSTEM; diff --git a/src/test/unit_tests/test_nc_health.js b/src/test/unit_tests/test_nc_health.js index 255549f706..562fab03cc 100644 --- a/src/test/unit_tests/test_nc_health.js +++ b/src/test/unit_tests/test_nc_health.js @@ -6,7 +6,6 @@ const path = require('path'); const _ = require('lodash'); const mocha = require('mocha'); -const sinon = require('sinon'); const assert = require('assert'); const config = require('../../../config'); const pkg = require('../../../package.json'); @@ -175,7 +174,7 @@ mocha.describe('nsfs nc health', function() { await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, {config_root, ...account1_options}); await exec_manage_cli(TYPES.BUCKET, ACTIONS.ADD, {config_root, ...bucket1_options}); await fs_utils.file_must_exist(path.join(config_root, 'master_keys.json')); - set_mock_functions(Health, { get_service_memory_usage: [100]}); + test_utils.set_health_mock_functions(Health, { get_service_memory_usage: [100]}); for (const user of Object.values(fs_users)) { await create_fs_user_by_platform(user.distinguished_name, user.distinguished_name, user.uid, user.gid); } @@ -207,7 +206,7 @@ mocha.describe('nsfs nc health', function() { }; valid_system_json[hostname].config_dir_version = config_fs.config_dir_version; await Health.config_fs.create_system_config_file(JSON.stringify(valid_system_json)); - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, }); @@ -246,7 +245,7 @@ mocha.describe('nsfs nc health', function() { }); mocha.it('NooBaa service is inactive', async function() { - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: [{ service_status: 'inactive', pid: 0 }], get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -257,7 +256,7 @@ mocha.describe('nsfs nc health', function() { }); mocha.it('NooBaa endpoint return error response is inactive', async function() { - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: [{ response: { response_code: 'MISSING_FORKS', total_fork_count: 3, running_workers: ['1', '3'] } }], get_system_config_file: get_system_config_mock_default_response @@ -271,7 +270,7 @@ mocha.describe('nsfs nc health', function() { // create it manually because we can not skip invalid storage path check on the CLI const account_invalid_options = { _id: mongo_utils.mongoObjectId(), name: 'account_invalid', nsfs_account_config: { new_buckets_path: path.join(new_buckets_path, '/invalid') } }; await test_utils.write_manual_config_file(TYPES.ACCOUNT, config_fs, account_invalid_options); - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -290,7 +289,7 @@ mocha.describe('nsfs nc health', function() { // create it manually because we can not skip invalid storage path check on the CLI const bucket_invalid = { _id: mongo_utils.mongoObjectId(), name: 'bucket_invalid', path: new_buckets_path + '/bucket1/invalid', owner_account: parsed_res._id }; await test_utils.write_manual_config_file(TYPES.BUCKET, config_fs, bucket_invalid); - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -312,7 +311,7 @@ mocha.describe('nsfs nc health', function() { await config_fs.delete_config_json_file(); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -337,7 +336,7 @@ mocha.describe('nsfs nc health', function() { await test_utils.write_manual_config_file(TYPES.BUCKET, config_fs, bucket_invalid_owner); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -357,7 +356,7 @@ mocha.describe('nsfs nc health', function() { await test_utils.write_manual_config_file(TYPES.BUCKET, config_fs, bucket_invalid_owner); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -374,7 +373,7 @@ mocha.describe('nsfs nc health', function() { // create it manually because we can not skip json schema check on the CLI const bucket_invalid_schema = { _id: mongo_utils.mongoObjectId(), name: 'bucket_invalid_schema', path: new_buckets_path }; await test_utils.write_manual_config_file(TYPES.BUCKET, config_fs, bucket_invalid_schema, 'invalid'); - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -391,7 +390,7 @@ mocha.describe('nsfs nc health', function() { // create it manually because we can not skip json schema check on the CLI const account_invalid_schema = { _id: mongo_utils.mongoObjectId(), name: 'account_invalid_schema', path: new_buckets_path, bla: 5 }; await test_utils.write_manual_config_file(TYPES.ACCOUNT, config_fs, account_invalid_schema, 'invalid'); - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -407,7 +406,7 @@ mocha.describe('nsfs nc health', function() { mocha.it('Health all condition is success, all_account_details is false', async function() { Health.all_account_details = false; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -423,7 +422,7 @@ mocha.describe('nsfs nc health', function() { mocha.it('Health all condition is success, all_bucket_details is false', async function() { Health.all_account_details = true; Health.all_bucket_details = false; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -442,7 +441,7 @@ mocha.describe('nsfs nc health', function() { Health.config_root = config_root_invalid; const old_config_fs = Health.config_fs; Health.config_fs = new ConfigFS(config_root_invalid); - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -464,7 +463,7 @@ mocha.describe('nsfs nc health', function() { await config_fs.delete_config_json_file(); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -484,7 +483,7 @@ mocha.describe('nsfs nc health', function() { await config_fs.delete_config_json_file(); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -507,7 +506,7 @@ mocha.describe('nsfs nc health', function() { await config_fs.delete_config_json_file(); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -529,7 +528,7 @@ mocha.describe('nsfs nc health', function() { await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, { config_root, ...account_inaccessible_dn_options }); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -548,7 +547,7 @@ mocha.describe('nsfs nc health', function() { await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, { config_root, ...account_inaccessible_dn_options }); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -570,7 +569,7 @@ mocha.describe('nsfs nc health', function() { await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, { config_root, ...account_inaccessible_dn_options }); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -592,7 +591,7 @@ mocha.describe('nsfs nc health', function() { await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, { config_root, ...invalid_account_dn_options }); Health.all_account_details = true; Health.all_bucket_details = true; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -613,7 +612,7 @@ mocha.describe('nsfs nc health', function() { await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, { config_root, ...account_valid }); Health.all_account_details = true; Health.all_bucket_details = false; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -630,7 +629,7 @@ mocha.describe('nsfs nc health', function() { await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, { config_root, ...account_invalid }); Health.all_account_details = true; Health.all_bucket_details = false; - set_mock_functions(Health, { + test_utils.set_health_mock_functions(Health, { get_service_state: get_service_state_mock_default_response, get_endpoint_response: get_endpoint_response_mock_default_response, get_system_config_file: get_system_config_mock_default_response @@ -829,25 +828,3 @@ function restore_health_if_needed(health_obj) { if (health_obj?.config_fs?.get_system_config_file?.restore) health_obj.config_fs.get_system_config_file.restore(); if (health_obj?.get_service_memory_usage?.restore) health_obj.get_service_memory_usage.restore(); } - -/** - * set_mock_functions sets mock functions used by the health script - * the second param is an object having the name of the mock functions as the keys and - * the value is an array of responses by the order of their call - * @param {Object} Health - * @param {{get_endpoint_response?: Object[], get_service_state?: Object[], - * get_system_config_file?: Object[], get_service_memory_usage?: Object[], - * get_lifecycle_health_status?: Object, get_latest_lifecycle_run_status?: Object}} mock_function_responses - */ -function set_mock_functions(Health, mock_function_responses) { - for (const mock_function_name of Object.keys(mock_function_responses)) { - const mock_function_responses_arr = mock_function_responses[mock_function_name]; - const obj_to_stub = mock_function_name === 'get_system_config_file' ? Health.config_fs : Health; - - if (obj_to_stub[mock_function_name]?.restore) obj_to_stub[mock_function_name]?.restore(); - const stub = sinon.stub(obj_to_stub, mock_function_name); - for (let i = 0; i < mock_function_responses_arr.length; i++) { - stub.onCall(i).returns(Promise.resolve(mock_function_responses_arr[i])); - } - } -} diff --git a/src/test/unit_tests/test_nc_with_a_couple_of_forks.js b/src/test/unit_tests/test_nc_with_a_couple_of_forks.js index 6d2759cefc..45198b4eef 100644 --- a/src/test/unit_tests/test_nc_with_a_couple_of_forks.js +++ b/src/test/unit_tests/test_nc_with_a_couple_of_forks.js @@ -9,8 +9,11 @@ const P = require('../../util/promise'); const mocha = require('mocha'); const assert = require('assert'); const fs_utils = require('../../util/fs_utils'); -const { TMP_PATH, generate_nsfs_account, get_new_buckets_path_by_test_env, generate_s3_client, get_coretest_path, exec_manage_cli } = require('../system_tests/test_utils'); +const { TMP_PATH, generate_nsfs_account, get_new_buckets_path_by_test_env, generate_s3_client, get_coretest_path, exec_manage_cli, set_health_mock_functions, create_system_json } = require('../system_tests/test_utils'); const { TYPES, ACTIONS } = require('../../manage_nsfs/manage_nsfs_constants'); +const { NSFSHealth } = require('../../manage_nsfs/health'); +const { ConfigFS } = require('../../sdk/config_fs'); +const config = require('../../../config'); const ManageCLIResponse = require('../../manage_nsfs/manage_nsfs_cli_responses').ManageCLIResponse; const coretest_path = get_coretest_path(); @@ -18,11 +21,12 @@ const coretest = require(coretest_path); const setup_options = { forks: 2, debug: 5 }; coretest.setup(setup_options); const { rpc_client, EMAIL, get_current_setup_options, stop_nsfs_process, start_nsfs_process, - config_dir_name, NC_CORETEST_CONFIG_FS, NC_CORETEST_STORAGE_PATH } = coretest; + config_dir_name, get_nsfs_fork_pids, NC_CORETEST_CONFIG_FS, NC_CORETEST_STORAGE_PATH } = coretest; const CORETEST_ENDPOINT = coretest.get_http_address(); const config_root = path.join(TMP_PATH, 'test_nc_cache_stat'); +const config_fs = new ConfigFS(config_root); // on NC - new_buckets_path is full absolute path // on Containerized - new_buckets_path is the directory const new_bucket_path_param = get_new_buckets_path_by_test_env(config_root, '/'); @@ -223,4 +227,57 @@ const s3_uid6001 = generate_s3_client(access_details.access_key, await s3_uid6001.deleteBucket({ Bucket: bucket_name4 }); await fs.promises.rm(new_bucket_path_param2, { recursive: true }); }); + + mocha.describe('health server - ping forks', async function() { + const https_port = 6443; + const fork_base_port = config.ENDPOINT_FORK_PORT_BASE; + const Health = new NSFSHealth({ config_root, https_port, config_fs, fork_base_port }); + + mocha.before(async () => { + await create_system_json(config_fs); + }); + + mocha.it('health concurrency with load - ping forks', async function() { + this.timeout(100000); // eslint-disable-line no-invalid-this + + for (let i = 0; i < 10; i++) { + // a couple of requests for health check + const failed_operations = []; + const successful_operations = []; + const num_of_concurrency = 10; + for (let j = 0; j < num_of_concurrency; j++) { + set_health_mock_functions(Health, { + get_service_state: [{ service_status: 'active', pid: 1000 }], + get_service_memory_usage: [100] + }); + Health.nc_nsfs_health() + .catch(err => failed_operations.push(err)) + .then(res => successful_operations.push(res)); + } + await P.delay(7000); + assert.equal(successful_operations.length, num_of_concurrency); + assert.equal(failed_operations.length, 0); + for (const res of successful_operations) { + assert.strictEqual(res.status, 'OK'); + } + } + }); + + mocha.it('fork server get the same port after restart', async function() { + const forks = await get_nsfs_fork_pids(); + assert.equal(forks.length, 2, 'There should be 2 forks running'); + process.kill(forks[0], 'SIGTERM'); + await P.delay(8000); // wait for the fork to be restarted + const new_forks = await get_nsfs_fork_pids(); + assert.equal(new_forks.length, 2, 'There should be 2 forks running after restart'); + + set_health_mock_functions(Health, { + get_service_state: [{ service_status: 'active', pid: 1000 }], + get_service_memory_usage: [100] + }); + const res = await Health.nc_nsfs_health(); + assert.strictEqual(res.status, 'OK'); + assert.strictEqual(res.checks.endpoint.endpoint_state.total_fork_count, 2, 'There should be 2 forks in the health response'); + }); + }); }); diff --git a/src/util/fork_utils.js b/src/util/fork_utils.js index ef06a9aaa8..38101aedb5 100644 --- a/src/util/fork_utils.js +++ b/src/util/fork_utils.js @@ -10,6 +10,7 @@ const prom_reporting = require('../server/analytic_services/prometheus_reporting const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent; const config = require('../../config'); const stats_collector_utils = require('./stats_collector_utils'); +const { is_nc_environment } = require('../nc/nc_utils'); const io_stats = { @@ -27,7 +28,7 @@ const fs_workers_stats = {}; * When count > 0 the primary process will fork worker processes to process incoming http requests. * In case of any worker exit, also the entire process group will exit. * @see https://nodejs.org/api/cluster.html - * + * * @param {number} [metrics_port] * @param {number} [https_metrics_port] * @param {string} [nsfs_config_root] nsfs configuration path @@ -36,10 +37,12 @@ const fs_workers_stats = {}; */ async function start_workers(metrics_port, https_metrics_port, nsfs_config_root, count = 0) { const exit_events = []; + const fork_port_offsets = []; if (cluster.isPrimary && count > 0) { for (let i = 0; i < count; ++i) { const worker = cluster.fork(); console.warn('WORKER started', { id: worker.id, pid: worker.process.pid }); + fork_port_offsets.push(worker.id); } // We don't want to leave the process with a partial set of workers, @@ -66,11 +69,26 @@ async function start_workers(metrics_port, https_metrics_port, nsfs_config_root, console.warn(`${exit_events.length} exit events in the last ${config.NSFS_EXIT_EVENTS_TIME_FRAME_MIN} minutes,` + ` max allowed are: ${config.NSFS_MAX_EXIT_EVENTS_PER_TIME_FRAME}`); const new_worker = cluster.fork(); - console.warn('WORKER re-started', { id: new_worker.id, pid: new_worker.process.pid }); + const offset = fork_port_offsets.findIndex(id => id === worker.id); + const listener = create_worker_message_handler({ + worker: new_worker, + offset: offset, + nsfs_config_root: nsfs_config_root + }); + new_worker.on('message', listener); + fork_port_offsets[offset] = new_worker.id; + const port = is_nc_environment() ? {port: config.ENDPOINT_FORK_PORT_BASE + offset} : {}; + console.warn('WORKER re-started', { id: new_worker.id, pid: new_worker.process.pid, ...port}); }); for (const id in cluster.workers) { if (id) { - cluster.workers[id].on('message', nsfs_io_stats_handler); + const offset = fork_port_offsets.findIndex(worker_id => worker_id === cluster.workers[id].id); + const listener = create_worker_message_handler({ + worker: cluster.workers[id], + offset: offset, + nsfs_config_root: nsfs_config_root + }); + cluster.workers[id].on('message', listener); } } if (metrics_port > 0 || https_metrics_port > 0) { @@ -84,20 +102,58 @@ async function start_workers(metrics_port, https_metrics_port, nsfs_config_root, return false; } -function nsfs_io_stats_handler(msg) { - if (msg.io_stats) { +//global variable as it is shared between all workers +let fork_server_retries = config.NC_FORK_SERVER_RETRIES; +function create_worker_message_handler(params) { + let fork_server_timer; + if (is_nc_environment() && fork_server_retries > 0) { + fork_server_timer = setTimeout(async () => { + dbg.error(`Timeout: It took more than ${config.NC_FORK_SERVER_TIMEOUT} minutes to get a ready message from worker ${params.worker.id}, killing the worker`); + fork_server_retries -= 1; + await params.worker.kill(); + if (fork_server_retries <= 0) { + dbg.error(`ERROR: fork health server failed to start for ${config.NC_FORK_SERVER_RETRIES} attempts stop retrying to reload the worker`); + } + }, config.NC_FORK_SERVER_TIMEOUT * 60 * 1000); + } + return function(msg) { + if (msg.io_stats) { for (const [key, value] of Object.entries(msg.io_stats)) { io_stats[key] += value; } prom_reporting.set_io_stats(io_stats); - } - if (msg.op_stats) { - _update_ops_stats(msg.op_stats); - prom_reporting.set_ops_stats(op_stats); - } - if (msg.fs_workers_stats) { - _update_fs_stats(msg.fs_workers_stats); - prom_reporting.set_fs_worker_stats(fs_workers_stats); + } + if (msg.op_stats) { + _update_ops_stats(msg.op_stats); + prom_reporting.set_ops_stats(op_stats); + } + if (msg.fs_workers_stats) { + _update_fs_stats(msg.fs_workers_stats); + prom_reporting.set_fs_worker_stats(fs_workers_stats); + } + if (msg.ready_to_start_fork_server) { + clearTimeout(fork_server_timer); + _send_fork_server_message(params); + } + }; +} + +/** + * Sends a message to the worker to start the fork server with the giver port + * NOTE - currently runs only on non containerized enviorment + */ +function _send_fork_server_message({worker, offset, nsfs_config_root}) { + const is_nc = is_nc_environment(); + if (is_nc && offset >= 0) { + //wait for the worker to be ready to receive messages + try { + worker.send({ + nsfs_config_root: nsfs_config_root, + health_port: config.ENDPOINT_FORK_PORT_BASE + offset + }); + } catch (err) { + dbg.warn(`Timeout: It took more than 5 minute to get a message from worker ${worker.id} do not send start server message`); + } } } diff --git a/src/util/http_utils.js b/src/util/http_utils.js index 992d319a56..d188cf082d 100644 --- a/src/util/http_utils.js +++ b/src/util/http_utils.js @@ -769,9 +769,9 @@ function http_get(uri, options) { /** * start_https_server starts the secure https server by type and options and creates a certificate if required * @param {number} https_port - * @param {('S3'|'IAM'|'STS'|'METRICS')} server_type - * @param {Object} request_handler -*/ + * @param {('S3'|'IAM'|'STS'|'METRICS'|'FORK_HEALTH')} server_type + * @param {Object} request_handler + */ async function start_https_server(https_port, server_type, request_handler, nsfs_config_root) { const ssl_cert_info = await ssl_utils.get_ssl_cert_info(server_type, nsfs_config_root); const https_server = await ssl_utils.create_https_server(ssl_cert_info, true, request_handler); @@ -788,9 +788,9 @@ async function start_https_server(https_port, server_type, request_handler, nsfs /** * start_http_server starts the non-secure http server by type * @param {number} http_port - * @param {('S3'|'IAM'|'STS'|'METRICS')} server_type - * @param {Object} request_handler -*/ + * @param {('S3'|'IAM'|'STS'|'METRICS'|'FORK_HEALTH')} server_type + * @param {Object} request_handler + */ async function start_http_server(http_port, server_type, request_handler) { const http_server = http.createServer(request_handler); if (http_port > 0) { @@ -804,11 +804,11 @@ async function start_http_server(http_port, server_type, request_handler) { * Listen server for http/https ports * @param {number} port * @param {http.Server} server - * @param {('S3'|'IAM'|'STS'|'METRICS')} server_type -*/ + * @param {('S3'|'IAM'|'STS'|'METRICS'|'FORK_HEALTH')} server_type + */ function listen_port(port, server, server_type) { return new Promise((resolve, reject) => { - if (server_type !== 'METRICS') { + if (server_type !== 'METRICS' && server_type !== 'FORK_HEALTH') { setup_endpoint_server(server); } server.listen(port, err => { diff --git a/src/util/ssl_utils.js b/src/util/ssl_utils.js index 944be4e5d7..5a9f85af09 100644 --- a/src/util/ssl_utils.js +++ b/src/util/ssl_utils.js @@ -47,7 +47,8 @@ const certs = { EXTERNAL_DB: new CertInfo(config.EXTERNAL_DB_SERVICE_CERT_PATH), STS: new CertInfo(config.STS_SERVICE_CERT_PATH), IAM: new CertInfo(config.IAM_SERVICE_CERT_PATH), - METRICS: new CertInfo(config.S3_SERVICE_CERT_PATH) // metric server will use the S3 cert. + METRICS: new CertInfo(config.S3_SERVICE_CERT_PATH), // metric server will use the S3 cert. + FORK_HEALTH: new CertInfo(config.S3_SERVICE_CERT_PATH) // fork health server will use the S3 cert. }; function generate_ssl_certificate() { @@ -66,7 +67,7 @@ function verify_ssl_certificate(certificate) { // Get SSL certificate (load once then serve from cache) async function get_ssl_cert_info(service, nsfs_config_root) { let cert_info; - if ((service === 'S3' || service === 'METRICS') && nsfs_config_root) { + if ((service === 'S3' || service === 'METRICS' || service === 'FORK_HEALTH') && nsfs_config_root) { const nsfs_ssl_cert_dir = path.join(nsfs_config_root, 'certificates/'); cert_info = new CertInfo(nsfs_ssl_cert_dir); } else {