Skip to content

open_ipc_handle should use shm from handle #779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/ipc_ipcapi/ipc_ipcapi_shm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ UMF_LOG_VAL="level:debug;flush:debug;output:stderr;pid:yes"
rm -f /dev/shm/${SHM_NAME}

echo "Starting ipc_ipcapi_shm CONSUMER on port $PORT ..."
UMF_LOG=$UMF_LOG_VAL ./umf_example_ipc_ipcapi_consumer $PORT $SHM_NAME &
UMF_LOG=$UMF_LOG_VAL ./umf_example_ipc_ipcapi_consumer $PORT &

echo "Waiting 1 sec ..."
sleep 1
Expand Down
37 changes: 20 additions & 17 deletions src/provider/provider_os_memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,10 @@ typedef struct os_ipc_data_t {
int fd;
size_t fd_offset;
size_t size;
char shm_name[NAME_MAX]; // optional - can be used or not (see below)
// shm_name is a Flexible Array Member because it is optional and its size
// varies on the Shared Memory object name
size_t shm_name_len;
char shm_name[];
} os_ipc_data_t;

static umf_result_t os_get_ipc_handle_size(void *provider, size_t *size) {
Expand All @@ -1160,13 +1163,8 @@ static umf_result_t os_get_ipc_handle_size(void *provider, size_t *size) {
return UMF_RESULT_ERROR_INVALID_ARGUMENT;
}

if (os_provider->shm_name[0]) {
// os_ipc_data_t->shm_name will be used
*size = sizeof(os_ipc_data_t);
} else {
// os_ipc_data_t->shm_name will NOT be used
*size = sizeof(os_ipc_data_t) - NAME_MAX;
}
// NOTE: +1 for '\0' at the end of the string
*size = sizeof(os_ipc_data_t) + strlen(os_provider->shm_name) + 1;

return UMF_RESULT_SUCCESS;
}
Expand Down Expand Up @@ -1195,9 +1193,10 @@ static umf_result_t os_get_ipc_handle(void *provider, const void *ptr,
os_ipc_data->pid = utils_getpid();
os_ipc_data->fd_offset = (size_t)value - 1;
os_ipc_data->size = size;
if (os_provider->shm_name[0]) {
strncpy(os_ipc_data->shm_name, os_provider->shm_name, NAME_MAX - 1);
os_ipc_data->shm_name[NAME_MAX - 1] = '\0';
os_ipc_data->shm_name_len = strlen(os_provider->shm_name);
if (os_ipc_data->shm_name_len > 0) {
strncpy(os_ipc_data->shm_name, os_provider->shm_name,
os_ipc_data->shm_name_len);
} else {
os_ipc_data->fd = os_provider->fd;
}
Expand All @@ -1222,8 +1221,12 @@ static umf_result_t os_put_ipc_handle(void *provider, void *providerIpcData) {
return UMF_RESULT_ERROR_INVALID_ARGUMENT;
}

if (os_provider->shm_name[0]) {
if (strncmp(os_ipc_data->shm_name, os_provider->shm_name, NAME_MAX)) {
size_t shm_name_len = strlen(os_provider->shm_name);
if (shm_name_len > 0) {
if (os_ipc_data->shm_name_len != shm_name_len) {
return UMF_RESULT_ERROR_INVALID_ARGUMENT;
} else if (strncmp(os_ipc_data->shm_name, os_provider->shm_name,
shm_name_len)) {
return UMF_RESULT_ERROR_INVALID_ARGUMENT;
}
} else {
Expand Down Expand Up @@ -1251,14 +1254,14 @@ static umf_result_t os_open_ipc_handle(void *provider, void *providerIpcData,
umf_result_t ret = UMF_RESULT_SUCCESS;
int fd;

if (os_provider->shm_name[0]) {
fd = utils_shm_open(os_provider->shm_name);
if (os_ipc_data->shm_name_len) {
fd = utils_shm_open(os_ipc_data->shm_name);
if (fd <= 0) {
LOG_PERR("opening a shared memory file (%s) failed",
os_provider->shm_name);
os_ipc_data->shm_name);
return UMF_RESULT_ERROR_UNKNOWN;
}
(void)utils_shm_unlink(os_provider->shm_name);
(void)utils_shm_unlink(os_ipc_data->shm_name);
} else {
umf_result_t umf_result =
utils_duplicate_fd(os_ipc_data->pid, os_ipc_data->fd, &fd);
Expand Down
75 changes: 62 additions & 13 deletions test/common/ipc_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "ipc_common.h"

#define INET_ADDR "127.0.0.1"
#define MSG_SIZE 1024
#define MSG_SIZE 1024 * 8

// consumer's response message
#define CONSUMER_MSG \
Expand All @@ -33,6 +33,10 @@ Generally communication between the producer and the consumer looks like:
- Producer creates a socket
- Producer connects to the consumer
- Consumer connects at IP 127.0.0.1 and a port to the producer
- Producer sends the IPC handle size to the consumer
- Consumer receives the IPC handle size from the producer
- Consumer sends the confirmation (IPC handle size) to the producer
- Producer receives the confirmation (IPC handle size) from the consumer
- Producer sends the IPC handle to the consumer
- Consumer receives the IPC handle from the producer
- Consumer opens the IPC handle received from the producer
Expand Down Expand Up @@ -127,29 +131,36 @@ int run_consumer(int port, umf_memory_provider_ops_t *provider_ops,
return -1;
}

// get the size of the IPC handle
size_t IPC_handle_size;
umf_result = umfMemoryProviderGetIPCHandleSize(provider, &IPC_handle_size);
if (umf_result != UMF_RESULT_SUCCESS) {
fprintf(stderr,
"[consumer] ERROR: getting size of the IPC handle failed\n");
producer_socket = consumer_connect(port);
if (producer_socket < 0) {
goto err_umfMemoryProviderDestroy;
}

// allocate the zeroed receive buffer
char *recv_buffer = calloc(1, IPC_handle_size);
char *recv_buffer = calloc(1, MSG_SIZE);
if (!recv_buffer) {
fprintf(stderr, "[consumer] ERROR: out of memory\n");
goto err_umfMemoryProviderDestroy;
}

producer_socket = consumer_connect(port);
if (producer_socket < 0) {
goto err_umfMemoryProviderDestroy;
// get the size of the IPC handle from the producer
size_t IPC_handle_size;
ssize_t recv_len = recv(producer_socket, recv_buffer, MSG_SIZE, 0);
if (recv_len < 0) {
fprintf(stderr, "[consumer] ERROR: recv() failed\n");
goto err_close_producer_socket;
}
IPC_handle_size = *(size_t *)recv_buffer;
fprintf(stderr, "[consumer] Got the size of the IPC handle: %zu\n",
IPC_handle_size);

// send confirmation to the producer (IPC handle size)
send(producer_socket, &IPC_handle_size, sizeof(IPC_handle_size), 0);
fprintf(stderr,
"[consumer] Send the confirmation (IPC handle size) to producer\n");

// receive a producer's message
ssize_t recv_len = recv(producer_socket, recv_buffer, IPC_handle_size, 0);
// receive IPC handle from the producer
recv_len = recv(producer_socket, recv_buffer, MSG_SIZE, 0);
if (recv_len < 0) {
fprintf(stderr, "[consumer] ERROR: recv() failed\n");
goto err_close_producer_socket;
Expand Down Expand Up @@ -388,6 +399,44 @@ int run_producer(int port, umf_memory_provider_ops_t *provider_ops,
goto err_PutIPCHandle;
}

// send the IPC_handle_size to the consumer
ssize_t len =
send(producer_socket, &IPC_handle_size, sizeof(IPC_handle_size), 0);
if (len < 0) {
fprintf(stderr, "[producer] ERROR: unable to send the message\n");
goto err_close_producer_socket;
}

fprintf(stderr,
"[producer] Sent the size of the IPC handle (%zu) to the consumer "
"(sent %zu bytes)\n",
IPC_handle_size, len);

// zero the consumer_message buffer
memset(consumer_message, 0, sizeof(consumer_message));

// receive the consumer's confirmation - IPC handle size
len = recv(producer_socket, consumer_message, sizeof(consumer_message), 0);
if (len < 0) {
fprintf(stderr, "[producer] ERROR: error while receiving the "
"confirmation from the consumer\n");
goto err_close_producer_socket;
}

size_t conf_IPC_handle_size = *(size_t *)consumer_message;
if (conf_IPC_handle_size == IPC_handle_size) {
fprintf(stderr,
"[producer] Received the correct confirmation (%zu) from the "
"consumer (%zu bytes)\n",
conf_IPC_handle_size, len);
} else {
fprintf(stderr,
"[producer] Received an INCORRECT confirmation (%zu) from the "
"consumer (%zu bytes)\n",
conf_IPC_handle_size, len);
goto err_close_producer_socket;
}

// send the IPC_handle of IPC_handle_size to the consumer
if (send(producer_socket, IPC_handle, IPC_handle_size, 0) < 0) {
fprintf(stderr, "[producer] ERROR: unable to send the message\n");
Expand Down
2 changes: 1 addition & 1 deletion test/ipc_os_prov_shm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ UMF_LOG_VAL="level:debug;flush:debug;output:stderr;pid:yes"
rm -f /dev/shm/${SHM_NAME}

echo "Starting ipc_os_prov_shm CONSUMER on port $PORT ..."
UMF_LOG=$UMF_LOG_VAL ./umf_test-ipc_os_prov_consumer $PORT $SHM_NAME &
UMF_LOG=$UMF_LOG_VAL ./umf_test-ipc_os_prov_consumer $PORT &

echo "Waiting 1 sec ..."
sleep 1
Expand Down
Loading