Skip to content

Backports to 5.18.6 | NC #9101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ config.ENDPOINT_SSL_PORT = Number(process.env.ENDPOINT_SSL_PORT) || 6443;
// Remove the NSFS condition when NSFS starts to support STS.
config.ENDPOINT_SSL_STS_PORT = Number(process.env.ENDPOINT_SSL_STS_PORT) || (process.env.NC_NSFS_NO_DB_ENV === 'true' ? -1 : 7443);
config.ENDPOINT_SSL_IAM_PORT = Number(process.env.ENDPOINT_SSL_IAM_PORT) || -1;
// each fork will get port in range [ENDPOINT_FORK_PORT_BASE, ENDPOINT_FORK_PORT_BASE + number of forks - 1)]
config.ENDPOINT_FORK_PORT_BASE = Number(process.env.ENDPOINT_FORK_PORT_BASE) || 6002;
config.ALLOW_HTTP = false;
config.ALLOW_HTTP_METRICS = true;
config.ALLOW_HTTPS_METRICS = true;
Expand All @@ -1015,6 +1017,8 @@ config.VIRTUAL_HOSTS = process.env.VIRTUAL_HOSTS || '';

config.NC_HEALTH_ENDPOINT_RETRY_COUNT = 3;
config.NC_HEALTH_ENDPOINT_RETRY_DELAY = 10;
config.NC_FORK_SERVER_TIMEOUT = 5; // 5 minutes
config.NC_FORK_SERVER_RETRIES = 10;


/** @type {'file' | 'executable'} */
Expand Down Expand Up @@ -1060,6 +1064,7 @@ config.NC_LIFECYCLE_BUCKET_BATCH_SIZE = 10000;
config.NC_LIFECYCLE_GPFS_ILM_ENABLED = true;
config.NC_LIFECYCLE_GPFS_ALLOW_SCAN_ON_REMOTE = true;
config.NC_GPFS_BIN_DIR = '/usr/lpp/mmfs/bin/';
config.NC_LIFECYCLE_GPFS_MMAPPLY_ILM_POLICY_CONCURRENCY = 1;

////////// GPFS //////////
config.GPFS_DOWN_DELAY = 1000;
Expand Down
63 changes: 54 additions & 9 deletions src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor');
const prom_reporting = require('../server/analytic_services/prometheus_reporting');
const { PersistentLogger } = require('../util/persistent_logger');
const { get_notification_logger } = require('../util/notifications_util');
const { is_nc_environment } = require('../nc/nc_utils');
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
const cluster = /** @type {import('node:cluster').Cluster} */ (
/** @type {unknown} */ (require('node:cluster'))
Expand All @@ -57,7 +58,8 @@ const SERVICES_TYPES_ENUM = Object.freeze({
S3: 'S3',
STS: 'STS',
IAM: 'IAM',
METRICS: 'METRICS'
METRICS: 'METRICS',
FORK_HEALTH: 'FORK_HEALTH',
});

const new_umask = process.env.NOOBAA_ENDPOINT_UMASK || 0o000;
Expand Down Expand Up @@ -117,11 +119,11 @@ async function main(options = {}) {
const https_metrics_port = options.https_metrics_port || config.EP_METRICS_SERVER_SSL_PORT;
/**
* Please notice that we can run the main in 2 states:
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
* is implemented here would be run by this process.
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
* in only relevant to the primary process it should be implemented in
* fork_utils.start_workers because the primary process returns after start_workers
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
* in only relevant to the primary process it should be implemented in
* fork_utils.start_workers because the primary process returns after start_workers
* and the forks will continue executing the code lines in this function
* */
const is_workers_started_from_primary = await fork_utils.start_workers(http_metrics_port, https_metrics_port,
Expand Down Expand Up @@ -202,14 +204,29 @@ async function main(options = {}) {
{ ...options, https_port: https_port_s3, http_port: http_port_s3, virtual_hosts, bucket_logger, notification_logger });
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.STS, init_request_sdk, { https_port: https_port_sts, virtual_hosts });
await start_endpoint_server_and_cert(SERVICES_TYPES_ENUM.IAM, init_request_sdk, { https_port: https_port_iam });

const is_nc = is_nc_environment();
// fork health server currently runs only on non containerized enviorment
if (is_nc) {
// current process is the primary and only fork. start the fork server directly with the base port
if (cluster.isPrimary) {
await fork_message_request_handler({
nsfs_config_root: options.nsfs_config_root,
health_port: config.ENDPOINT_FORK_PORT_BASE
});
// current process is a worker so we listen to get the port from the primary process.
} else {
process.on('message', fork_message_request_handler);
//send a message to the primary process that we are ready to receive messages
process.send({ready_to_start_fork_server: true});
}
}

// START METRICS SERVER
if ((http_metrics_port > 0 || https_metrics_port > 0) && cluster.isPrimary) {
await prom_reporting.start_server(http_metrics_port, https_metrics_port, false, options.nsfs_config_root);
}

// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
// TODO: currently NC NSFS deployments don't have internal_rpc_client nor db,
// there for namespace monitor won't be registered
if (internal_rpc_client && config.NAMESPACE_MONITOR_ENABLED) {
endpoint_stats_collector.instance().set_rpc_client(internal_rpc_client);
Expand Down Expand Up @@ -293,8 +310,6 @@ function create_endpoint_handler(server_type, init_request_sdk, { virtual_hosts,
return blob_rest_handler(req, res);
} else if (req.url.startsWith('/total_fork_count')) {
return fork_count_handler(req, res);
} else if (req.url.startsWith('/endpoint_fork_id')) {
return endpoint_fork_id_handler(req, res);
} else if (req.url.startsWith('/_/')) {
// internals non S3 requests
const api = req.url.slice('/_/'.length);
Expand Down Expand Up @@ -535,8 +550,38 @@ function unavailable_handler(req, res) {
res.end(reply);
}

/**
* handler for the inidivdual fork server. used to handle requests the get the worker id
* currently used to check if fork is alive by the health script
* @param {EndpointRequest} req
* @param {import('http').ServerResponse} res
*/
function fork_main_handler(req, res) {
endpoint_utils.set_noobaa_server_header(res);
endpoint_utils.prepare_rest_request(req);
if (req.url.startsWith('/endpoint_fork_id')) {
return endpoint_fork_id_handler(req, res);
} else {
return internal_api_error(req, res, `Unknown API call ${req.url}`);
}
}

/**
* fork_message_request_handler is used to handle messages from the primary process.
* the primary process sends a message with the designated port to start the fork server.
* @param {Object} msg
*/
async function fork_message_request_handler(msg) {
await http_utils.start_https_server(msg.health_port,
SERVICES_TYPES_ENUM.FORK_HEALTH,
fork_main_handler,
msg.nsfs_config_root
);
}

exports.main = main;
exports.create_endpoint_handler = create_endpoint_handler;
exports.create_init_request_sdk = create_init_request_sdk;
exports.endpoint_fork_id_handler = endpoint_fork_id_handler;

if (require.main === module) main();
75 changes: 36 additions & 39 deletions src/manage_nsfs/health.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ process.env.AWS_SDK_JS_SUPPRESS_MAINTENANCE_MODE_MESSAGE = '1';
class NSFSHealth {
constructor(options) {
this.https_port = options.https_port;
this.fork_base_port = options.fork_base_port;
this.all_account_details = options.all_account_details;
this.all_bucket_details = options.all_bucket_details;
this.all_connection_details = options.all_connection_details;
Expand Down Expand Up @@ -241,10 +242,10 @@ class NSFSHealth {
return service_health;
}

async make_endpoint_health_request(url_path) {
async make_endpoint_health_request(url_path, port = this.https_port) {
const response = await make_https_request({
HOSTNAME,
port: this.https_port,
hostname: HOSTNAME,
port,
path: url_path,
method: 'GET',
rejectUnauthorized: false,
Expand All @@ -260,43 +261,37 @@ class NSFSHealth {
let url_path = '/total_fork_count';
const worker_ids = [];
let total_fork_count = 0;
let fork_count_response;
let response;
try {
const fork_count_response = await this.make_endpoint_health_request(url_path);
if (!fork_count_response) {
return {
response: fork_response_code.NOT_RUNNING,
total_fork_count: total_fork_count,
running_workers: worker_ids,
};
}
total_fork_count = fork_count_response.fork_count;
if (total_fork_count > 0) {
url_path = '/endpoint_fork_id';
await P.retry({
attempts: total_fork_count * 2,
delay_ms: 1,
func: async () => {
const fork_id_response = await this.make_endpoint_health_request(url_path);
if (fork_id_response.worker_id && !worker_ids.includes(fork_id_response.worker_id)) {
worker_ids.push(fork_id_response.worker_id);
}
if (worker_ids.length < total_fork_count) {
throw new Error('Number of running forks is less than the expected fork count.');
}
}
});
if (worker_ids.length === total_fork_count) {
response = fork_response_code.RUNNING;
} else {
response = fork_response_code.MISSING_FORKS;
}
} else {
response = fork_response_code.RUNNING;
}
fork_count_response = await this.make_endpoint_health_request(url_path);
} catch (err) {
dbg.log1('Error while pinging endpoint host :' + HOSTNAME + ', port ' + this.https_port, err);
response = fork_response_code.NOT_RUNNING;
dbg.log0('Error while pinging endpoint host :' + HOSTNAME, err);
}
if (!fork_count_response) {
return {
response: fork_response_code.NOT_RUNNING,
total_fork_count: total_fork_count,
running_workers: worker_ids,
};
}

total_fork_count = fork_count_response.fork_count;
url_path = '/endpoint_fork_id';
for (let i = 0; i < total_fork_count; i++) {
const port = this.fork_base_port + i;
try {
const fork_id_response = await this.make_endpoint_health_request(url_path, port);
worker_ids.push(fork_id_response.worker_id);
} catch (err) {
dbg.log0('Error while pinging fork :' + HOSTNAME + ', port ' + port, err);
}
}
if (worker_ids.length < total_fork_count) {
dbg.log0('Number of running forks is less than the expected fork count.');
response = fork_response_code.MISSING_FORKS;
} else {
response = fork_response_code.RUNNING;
}
return {
response: response,
Expand Down Expand Up @@ -637,6 +632,7 @@ class NSFSHealth {
async function get_health_status(argv, config_fs) {
try {
const https_port = Number(argv.https_port) || config.ENDPOINT_SSL_PORT;
const fork_base_port = Number(argv.fork_base_port) || config.ENDPOINT_FORK_PORT_BASE;
const deployment_type = argv.deployment_type || 'nc';
const all_account_details = get_boolean_or_string_value(argv.all_account_details);
const all_bucket_details = get_boolean_or_string_value(argv.all_bucket_details);
Expand All @@ -645,8 +641,9 @@ async function get_health_status(argv, config_fs) {
const lifecycle = get_boolean_or_string_value(argv.lifecycle);

if (deployment_type === 'nc') {
const health = new NSFSHealth({ https_port,
all_account_details, all_bucket_details, all_connection_details, notif_storage_threshold, lifecycle, config_fs });
const health = new NSFSHealth({ https_port, fork_base_port,
all_account_details, all_bucket_details, all_connection_details,
notif_storage_threshold, lifecycle, config_fs });
const health_status = await health.nc_nsfs_health();
write_stdout_response(ManageCLIResponse.HealthStatus, health_status);
} else {
Expand Down
Loading