Skip to content

Commit 9dece0e

Browse files
authored
Bucket Notifications (#8337)
Bucket Notifications - phase 1 Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent 9ad101e commit 9dece0e

29 files changed

+821
-39
lines changed

config.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,12 @@ config.PERSISTENT_BUCKET_LOG_DIR = process.env.GUARANTEED_LOGS_PATH;
674674
config.PERSISTENT_BUCKET_LOG_NS = 'bucket_logging';
675675
config.BUCKET_LOG_CONCURRENCY = 10;
676676

677+
////////////////////////////////
678+
// NOTIFICATIONS //
679+
////////////////////////////////
680+
config.NOTIFICATION_LOG_NS = 'notification_logging';
681+
config.NOTIFICATION_LOG_DIR = process.env.NOTIFICATION_LOG_DIR;
682+
677683
///////////////////////////
678684
// KEY ROTATOR //
679685
///////////////////////////

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
"nan": "2.20.0",
109109
"ncp": "2.0.0",
110110
"node-addon-api": "8.1.0",
111+
"node-rdkafka": "3.0.1",
111112
"performance-now": "2.1.0",
112113
"pg": "8.13.0",
113114
"ping": "0.4.4",

src/api/bucket_api.js

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,59 @@ module.exports = {
300300
}
301301
},
302302

303+
get_bucket_notification: {
304+
method: 'GET',
305+
params: {
306+
type: 'object',
307+
required: [
308+
'name'
309+
],
310+
properties: {
311+
name: { $ref: 'common_api#/definitions/bucket_name' },
312+
}
313+
},
314+
reply: {
315+
type: 'object',
316+
required: [
317+
'notifications'
318+
],
319+
properties: {
320+
notifications: {
321+
type: 'array',
322+
items: {
323+
$ref: 'common_api#/definitions/bucket_notification'
324+
}
325+
}
326+
}
327+
},
328+
auth: {
329+
system: ['admin', 'user']
330+
}
331+
},
332+
333+
put_bucket_notification: {
334+
method: 'PUT',
335+
params: {
336+
type: 'object',
337+
required: [
338+
'notifications',
339+
'name',
340+
],
341+
properties: {
342+
name: { $ref: 'common_api#/definitions/bucket_name' },
343+
notifications: {
344+
type: 'array',
345+
items: {
346+
$ref: 'common_api#/definitions/bucket_notification'
347+
}
348+
}
349+
}
350+
},
351+
auth: {
352+
system: ['admin', 'user'],
353+
}
354+
},
355+
303356
read_bucket_sdk_info: {
304357
method: 'GET',
305358
params: {
@@ -1130,6 +1183,12 @@ module.exports = {
11301183
bucket_info: {
11311184
$ref: '#/definitions/bucket_info'
11321185
},
1186+
notifications: {
1187+
type: 'array',
1188+
items: {
1189+
$ref: 'common_api#/definitions/bucket_notification'
1190+
}
1191+
}
11331192
}
11341193
},
11351194

src/api/common_api.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,6 +1377,46 @@ module.exports = {
13771377
type: 'string',
13781378
},
13791379
}
1380+
},
1381+
bucket_notification: {
1382+
type: 'object',
1383+
required: ['Id', 'Connect'],
1384+
properties: {
1385+
Id: {
1386+
type: 'string'
1387+
},
1388+
Connect: {
1389+
type: 'string'
1390+
},
1391+
Events: {
1392+
type: 'array',
1393+
items: {
1394+
type: 'string',
1395+
enum: [
1396+
's3:TestEvent',
1397+
's3:ObjectCreated:*',
1398+
's3:ObjectCreated:Put',
1399+
's3:ObjectCreated:Post',
1400+
's3:ObjectCreated:Copy',
1401+
's3:ObjectCreated:CompleteMultipartUpload',
1402+
's3:ObjectRemoved:*',
1403+
's3:ObjectRemoved:Delete',
1404+
's3:ObjectRemoved:DeleteMarkerCreated',
1405+
's3:ObjectRestore:*',
1406+
's3:ObjectRestore:Post',
1407+
's3:ObjectRestore:Completed',
1408+
's3:ObjectRestore:Delete',
1409+
's3:ObjectTagging:*',
1410+
's3:ObjectTagging:Put',
1411+
's3:ObjectTagging:Delete',
1412+
/*We plan to support LifecycleExpiration
1413+
's3:LifecycleExpiration:*',
1414+
's3:LifecycleExpiration:Delete',
1415+
's3:LifecycleExpiration:DeleteMarkerCreated',*/
1416+
],
1417+
}
1418+
}
1419+
}
13801420
}
13811421
}
13821422
};

src/api/object_api.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ module.exports = {
176176
content_type: { type: 'string' },
177177
content_encoding: { type: 'string' },
178178
size: { type: 'integer' },
179+
seq: { type: 'integer' },
179180
}
180181
},
181182
auth: { system: ['admin', 'user'] }
@@ -653,6 +654,7 @@ module.exports = {
653654
deleted_delete_marker: { type: 'boolean' },
654655
created_version_id: { type: 'string' },
655656
created_delete_marker: { type: 'boolean' },
657+
seq: { type: 'integer' },
656658
}
657659
},
658660
auth: { system: ['admin', 'user'] }
@@ -690,6 +692,7 @@ module.exports = {
690692
deleted_delete_marker: { type: 'boolean' },
691693
created_version_id: { type: 'string' },
692694
created_delete_marker: { type: 'boolean' },
695+
seq: { type: 'integer' },
693696
err_code: {
694697
type: 'string',
695698
enum: ['AccessDenied', 'InternalError']

src/cmd/manage_nsfs.js

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const { throw_cli_error, get_bucket_owner_account_by_name,
2727
is_name_update, is_access_key_update } = require('../manage_nsfs/manage_nsfs_cli_utils');
2828
const manage_nsfs_validations = require('../manage_nsfs/manage_nsfs_validations');
2929
const nc_mkm = require('../manage_nsfs/nc_master_key_manager').get_instance();
30+
const notifications_util = require('../util/notifications_util');
3031

3132
let config_fs;
3233

@@ -70,6 +71,8 @@ async function main(argv = minimist(process.argv.slice(2))) {
7071
await noobaa_cli_diagnose.manage_diagnose_operations(action, user_input, config_fs);
7172
} else if (type === TYPES.UPGRADE) {
7273
await noobaa_cli_upgrade.manage_upgrade_operations(action, user_input, config_fs);
74+
} else if (type === TYPES.NOTIFICATION) {
75+
await notification_management();
7376
} else {
7477
throw_cli_error(ManageCLIError.InvalidType);
7578
}
@@ -100,8 +103,9 @@ async function fetch_bucket_data(action, user_input) {
100103
should_create_underlying_storage: action === ACTIONS.ADD ? false : undefined,
101104
new_name: user_input.new_name === undefined ? undefined : String(user_input.new_name),
102105
fs_backend: user_input.fs_backend === undefined ? config.NSFS_NC_STORAGE_BACKEND : String(user_input.fs_backend),
103-
force_md5_etag: user_input.force_md5_etag === undefined || user_input.force_md5_etag === '' ? user_input.force_md5_etag : get_boolean_or_string_value(user_input.force_md5_etag)
104-
};
106+
force_md5_etag: user_input.force_md5_etag === undefined || user_input.force_md5_etag === '' ? user_input.force_md5_etag : get_boolean_or_string_value(user_input.force_md5_etag),
107+
notifications: user_input.notifications
108+
};
105109

106110
if (user_input.bucket_policy !== undefined) {
107111
if (typeof user_input.bucket_policy === 'string') {
@@ -196,14 +200,22 @@ async function get_bucket_status(data) {
196200
* @param {Object} data
197201
* @returns { Promise<{ code: typeof ManageCLIResponse.BucketUpdated, detail: Object }>}
198202
*/
199-
async function update_bucket(data) {
203+
async function update_bucket(data, user_input) {
200204
const cur_name = data.name;
201205
const new_name = data.new_name;
202206
const name_update = is_name_update(data);
203207
const cli_bucket_flags_to_remove = ['new_name'];
204208
data = _.omit(data, cli_bucket_flags_to_remove);
205209

206210
let parsed_bucket_data;
211+
212+
if (user_input.notifications) {
213+
//notifications are tested before they can be updated
214+
const test_notif_err = await notifications_util.test_notifications(data);
215+
if (test_notif_err) {
216+
throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err);
217+
}
218+
}
207219
if (name_update) {
208220
parsed_bucket_data = await config_fs.create_bucket_config_file({ ...data, name: new_name });
209221
await config_fs.delete_bucket_config_file(cur_name);
@@ -271,7 +283,7 @@ async function bucket_management(action, user_input) {
271283
} else if (action === ACTIONS.STATUS) {
272284
response = await get_bucket_status(data);
273285
} else if (action === ACTIONS.UPDATE) {
274-
response = await update_bucket(data);
286+
response = await update_bucket(data, user_input);
275287
} else if (action === ACTIONS.DELETE) {
276288
const force = get_boolean_or_string_value(user_input.force);
277289
response = await delete_bucket(data, force);
@@ -706,5 +718,9 @@ async function logging_management() {
706718
await manage_nsfs_logging.export_bucket_logging(config_fs);
707719
}
708720

721+
async function notification_management() {
722+
new notifications_util.Notificator({fs_context: config_fs.fs_context}).process_notification_files();
723+
}
724+
709725
exports.main = main;
710726
if (require.main === module) main();

src/cmd/nsfs.js

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ function print_usage() {
121121
let nsfs_config_root;
122122

123123
class NsfsObjectSDK extends ObjectSDK {
124-
constructor(fs_root, fs_config, account, versioning, config_root) {
124+
constructor(fs_root, fs_config, account, versioning, config_root, nsfs_system) {
125125
// const rpc_client_hooks = new_rpc_client_hooks();
126126
// rpc_client_hooks.account.read_account_by_access_key = async ({ access_key }) => {
127127
// if (access_key) {
@@ -152,6 +152,7 @@ class NsfsObjectSDK extends ObjectSDK {
152152
this.nsfs_account = account;
153153
this.nsfs_versioning = versioning;
154154
this.nsfs_namespaces = {};
155+
this.nsfs_system = nsfs_system;
155156
if (!config_root) {
156157
this._get_bucket_namespace = bucket_name => this._simple_get_single_bucket_namespace(bucket_name);
157158
this.load_requesting_account = auth_req => this._simple_load_requesting_account(auth_req);
@@ -239,6 +240,7 @@ class NsfsAccountSDK extends AccountSDK {
239240
}
240241
}
241242

243+
/* eslint-disable max-statements */
242244
async function main(argv = minimist(process.argv.slice(2))) {
243245
try {
244246
config.DB_TYPE = 'none';
@@ -318,15 +320,16 @@ async function main(argv = minimist(process.argv.slice(2))) {
318320
nsfs_config_root,
319321
});
320322

323+
let system_data;
321324
if (!simple_mode) {
322325
// Do not move this function - we need to create/update RPM changes before starting the endpoint
323326
const config_fs = new ConfigFS(nsfs_config_root);
324-
const system_data = await config_fs.get_system_config_file({ silent_if_missing: true });
327+
system_data = await config_fs.get_system_config_file({ silent_if_missing: true });
325328
if (system_data && system_data[os.hostname()]) {
326329
const nc_upgrade_manager = new NCUpgradeManager(config_fs);
327330
await nc_upgrade_manager.update_rpm_upgrade();
328331
} else {
329-
await config_fs.init_nc_system();
332+
system_data = await config_fs.init_nc_system();
330333
}
331334
}
332335

@@ -340,7 +343,7 @@ async function main(argv = minimist(process.argv.slice(2))) {
340343
forks,
341344
nsfs_config_root,
342345
init_request_sdk: (req, res) => {
343-
req.object_sdk = new NsfsObjectSDK(fs_root, fs_config, account, versioning, nsfs_config_root);
346+
req.object_sdk = new NsfsObjectSDK(fs_root, fs_config, account, versioning, nsfs_config_root, system_data);
344347
req.account_sdk = new NsfsAccountSDK(fs_root, fs_config, account, nsfs_config_root);
345348
}
346349
});

src/endpoint/endpoint.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ dbg.log0('endpoint: replacing old umask: ', old_umask.toString(8), 'with new uma
6565
* sts_sdk?: StsSDK;
6666
* virtual_hosts?: readonly string[];
6767
* bucket_logger?: PersistentLogger;
68+
* notification_logger?: PersistentLogger;
6869
* }} EndpointRequest
6970
*/
7071

@@ -100,6 +101,7 @@ async function create_https_server(ssl_cert_info, honorCipherOrder, endpoint_han
100101
/* eslint-disable max-statements */
101102
async function main(options = {}) {
102103
let bucket_logger;
104+
let notification_logger;
103105
try {
104106
// setting process title needed for letting GPFS to identify the noobaa endpoint processes see issue #8039.
105107
if (config.ENDPOINT_PROCESS_TITLE) {
@@ -137,6 +139,12 @@ async function main(options = {}) {
137139
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
138140
});
139141

142+
notification_logger = config.NOTIFICATION_LOG_DIR &&
143+
new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_name + '_' + config.NOTIFICATION_LOG_NS, {
144+
locking: 'SHARED',
145+
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
146+
});
147+
140148
process.on('warning', e => dbg.warn(e.stack));
141149

142150
let internal_rpc_client;
@@ -174,7 +182,8 @@ async function main(options = {}) {
174182
init_request_sdk = create_init_request_sdk(rpc, internal_rpc_client, object_io);
175183
}
176184

177-
const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false, bucket_logger);
185+
const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false,
186+
bucket_logger, notification_logger);
178187
const endpoint_request_handler_sts = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ true);
179188

180189
const ssl_cert_info = await ssl_utils.get_ssl_cert_info('S3', options.nsfs_config_root);
@@ -266,7 +275,7 @@ async function main(options = {}) {
266275
* @param {readonly string[]} virtual_hosts
267276
* @returns {EndpointHandler}
268277
*/
269-
function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) {
278+
function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger, notification_logger) {
270279
const blob_rest_handler = process.env.ENDPOINT_BLOB_ENABLED === 'true' ? blob_rest : unavailable_handler;
271280
const lambda_rest_handler = config.DB_TYPE === 'mongodb' ? lambda_rest : unavailable_handler;
272281

@@ -276,6 +285,7 @@ function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) {
276285
endpoint_utils.prepare_rest_request(req);
277286
req.virtual_hosts = virtual_hosts;
278287
if (logger) req.bucket_logger = logger;
288+
if (notification_logger) req.notification_logger = notification_logger;
279289
init_request_sdk(req, res);
280290
if (req.url.startsWith('/2015-03-31/functions')) {
281291
return lambda_rest_handler(req, res);

src/endpoint/s3/ops/s3_delete_object.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ async function delete_object(req, res) {
2626
} else if (del_res.created_delete_marker) {
2727
res.setHeader('x-amz-version-id', del_res.created_version_id);
2828
res.setHeader('x-amz-delete-marker', 'true');
29+
req.s3_event_method = 'DeleteMarkerCreated';
2930
}
31+
res.seq = del_res.seq;
3032
}
3133

3234
module.exports = {

src/endpoint/s3/ops/s3_get_bucket_notification.js

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,23 @@
55
* http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETnotification.html
66
*/
77
async function get_bucket_notification(req) {
8-
await req.object_sdk.read_bucket({ name: req.params.bucket });
9-
return {
10-
NotificationConfiguration: ''
11-
};
8+
9+
const result = await req.object_sdk.get_bucket_notification({
10+
bucket_name: req.params.bucket,
11+
});
12+
13+
14+
const reply = result && result.length > 0 ?
15+
{
16+
//return result inside TopicConfiguration tag
17+
NotificationConfiguration: {
18+
TopicConfiguration: result
19+
}
20+
} :
21+
//if there's no notification, reuturn empty NotificationConfiguration tag
22+
{ NotificationConfiguration: {} };
23+
24+
return reply;
1225
}
1326

1427
module.exports = {

0 commit comments

Comments
 (0)