Skip to content

Commit 51b272c

Browse files
committed
Add an IPC shared memory test
Add an IPC shared memory test using only the OS memory provider API (not using the API from ipc.h). Signed-off-by: Lukasz Dorau <lukasz.dorau@intel.com>
1 parent 927a489 commit 51b272c

File tree

5 files changed

+622
-0
lines changed

5 files changed

+622
-0
lines changed

test/CMakeLists.txt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,3 +240,35 @@ endif()
240240
if(UMF_ENABLE_POOL_TRACKING)
241241
add_umf_test(NAME ipc SRCS ipcAPI.cpp)
242242
endif()
243+
244+
if(LINUX)
245+
set(BASE_NAME ipc_os_prov)
246+
set(TEST_NAME umf-${BASE_NAME})
247+
248+
foreach(loop_var IN ITEMS "producer" "consumer")
249+
set(EXEC_NAME umf_test-${BASE_NAME}_${loop_var})
250+
add_umf_executable(
251+
NAME ${EXEC_NAME}
252+
SRCS ${BASE_NAME}_${loop_var}.c
253+
LIBS umf)
254+
255+
target_include_directories(
256+
${EXEC_NAME} PRIVATE ${UMF_CMAKE_SOURCE_DIR}/src/utils
257+
${UMF_CMAKE_SOURCE_DIR}/include)
258+
259+
target_link_directories(${EXEC_NAME} PRIVATE ${LIBHWLOC_LIBRARY_DIRS})
260+
endforeach(loop_var)
261+
262+
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/${BASE_NAME}.sh
263+
DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
264+
265+
add_test(
266+
NAME ${TEST_NAME}
267+
COMMAND ${BASE_NAME}.sh
268+
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
269+
270+
set_tests_properties(${TEST_NAME} PROPERTIES LABELS "umf")
271+
else()
272+
message(
273+
STATUS "IPC shared memory test is supported on Linux only - skipping")
274+
endif()

test/ipc_os_prov.sh

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#
2+
# Copyright (C) 2024 Intel Corporation
3+
#
4+
# Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
5+
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
#
7+
8+
#!/bin/bash
9+
10+
# port should be a number from the range <1024, 65535>
11+
PORT=$(( 1024 + ( $$ % ( 65535 - 1024 ))))
12+
13+
# The ipc_os_prov example requires using pidfd_getfd(2)
14+
# to obtain a duplicate of another process's file descriptor.
15+
# Permission to duplicate another process's file descriptor
16+
# is governed by a ptrace access mode PTRACE_MODE_ATTACH_REALCREDS check (see ptrace(2))
17+
# that can be changed using the /proc/sys/kernel/yama/ptrace_scope interface.
18+
PTRACE_SCOPE_FILE="/proc/sys/kernel/yama/ptrace_scope"
19+
VAL=0
20+
if [ -f $PTRACE_SCOPE_FILE ]; then
21+
PTRACE_SCOPE_VAL=$(cat $PTRACE_SCOPE_FILE)
22+
if [ $PTRACE_SCOPE_VAL -ne $VAL ]; then
23+
echo "Setting ptrace_scope to 0 (classic ptrace permissions) ..."
24+
echo "$ sudo bash -c \"echo $VAL > $PTRACE_SCOPE_FILE\""
25+
sudo bash -c "echo $VAL > $PTRACE_SCOPE_FILE"
26+
fi
27+
PTRACE_SCOPE_VAL=$(cat $PTRACE_SCOPE_FILE)
28+
if [ $PTRACE_SCOPE_VAL -ne $VAL ]; then
29+
echo "SKIP: setting ptrace_scope to 0 (classic ptrace permissions) FAILED - skipping the test"
30+
exit 0
31+
fi
32+
fi
33+
34+
UMF_LOG_VAL="level:debug;flush:debug;output:stderr;pid:yes"
35+
36+
echo "Starting ipc_os_prov CONSUMER on port $PORT ..."
37+
UMF_LOG=$UMF_LOG_VAL ./umf_test-ipc_os_prov_consumer $PORT &
38+
39+
echo "Waiting 1 sec ..."
40+
sleep 1
41+
42+
echo "Starting ipc_os_prov PRODUCER on port $PORT ..."
43+
UMF_LOG=$UMF_LOG_VAL ./umf_test-ipc_os_prov_producer $PORT

test/ipc_os_prov_consumer.c

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Copyright (C) 2024 Intel Corporation
3+
*
4+
* Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
5+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
*/
7+
8+
#include <arpa/inet.h>
9+
#include <stdio.h>
10+
#include <stdlib.h>
11+
#include <string.h>
12+
#include <sys/socket.h>
13+
#include <unistd.h>
14+
15+
#include <umf/providers/provider_os_memory.h>
16+
17+
#define INET_ADDR "127.0.0.1"
18+
#define MSG_SIZE 256
19+
#define RECV_BUFF_SIZE 1024
20+
21+
// consumer's response message
22+
#define CONSUMER_MSG \
23+
"This is the consumer. I just wrote a new number directly into your " \
24+
"shared memory!"
25+
26+
/*
27+
Generally communication between the producer and the consumer looks like:
28+
- Consumer starts
29+
- Consumer creates a socket
30+
- Consumer listens for incoming connections
31+
- Producer starts
32+
- Producer's shared memory contains a number: 140722582213392
33+
- Producer gets the IPC handle
34+
- Producer creates a socket
35+
- Producer connects to the consumer
36+
- Consumer connects at IP 127.0.0.1 and port 47770 to the producer
37+
- Producer sents the IPC handle to the consumer (24 bytes)
38+
- Consumer receives the IPC handle from the producer (24 bytes)
39+
- Consumer opens the IPC handle received from the producer
40+
- Consumer reads the number from the producer's shared memory: 140722582213392
41+
- Consumer writes a new number directly to the producer's shared memory: 70361291106696
42+
- Consumer sents a response message to the producer
43+
- Consumer closes the IPC handle received from the producer
44+
- Producer receives the response from the consumer: "This is the consumer. I just wrote a new number directly into your shared memory!"
45+
- Producer verifies the consumer wrote the correct value (the old one / 2) to the producer's shared memory: 70361291106696
46+
- Producer puts the IPC handle
47+
- Consumer shuts down
48+
- Producer shuts down
49+
*/
50+
51+
int consumer_connect(int port) {
52+
struct sockaddr_in consumer_addr;
53+
struct sockaddr_in producer_addr;
54+
int producer_addr_len;
55+
int producer_socket = -1;
56+
int consumer_socket = -1;
57+
int ret = -1;
58+
59+
// create a socket
60+
consumer_socket = socket(AF_INET, SOCK_STREAM, 0);
61+
if (consumer_socket < 0) {
62+
fprintf(stderr, "[consumer] ERROR: creating socket failed\n");
63+
return -1;
64+
}
65+
66+
fprintf(stderr, "[consumer] Socket created\n");
67+
68+
// set the IP address and the port
69+
consumer_addr.sin_family = AF_INET;
70+
consumer_addr.sin_port = htons(port);
71+
consumer_addr.sin_addr.s_addr = inet_addr(INET_ADDR);
72+
73+
// bind to the IP address and the port
74+
if (bind(consumer_socket, (struct sockaddr *)&consumer_addr,
75+
sizeof(consumer_addr)) < 0) {
76+
fprintf(stderr, "[consumer] ERROR: cannot bind to the port\n");
77+
goto err_close_consumer_socket;
78+
}
79+
80+
fprintf(stderr, "[consumer] Binding done\n");
81+
82+
// listen for the producer
83+
if (listen(consumer_socket, 1) < 0) {
84+
fprintf(stderr, "[consumer] ERROR: listen() failed\n");
85+
goto err_close_consumer_socket;
86+
}
87+
88+
fprintf(stderr, "[consumer] Listening for incoming connections ...\n");
89+
90+
// accept an incoming connection
91+
producer_addr_len = sizeof(producer_addr);
92+
producer_socket = accept(consumer_socket, (struct sockaddr *)&producer_addr,
93+
(socklen_t *)&producer_addr_len);
94+
if (producer_socket < 0) {
95+
fprintf(stderr, "[consumer] ERROR: accept() failed\n");
96+
goto err_close_consumer_socket;
97+
}
98+
99+
fprintf(stderr, "[consumer] Producer connected at IP %s and port %i\n",
100+
inet_ntoa(producer_addr.sin_addr), ntohs(producer_addr.sin_port));
101+
102+
ret = producer_socket; // success
103+
104+
err_close_consumer_socket:
105+
close(consumer_socket);
106+
107+
return ret;
108+
}
109+
110+
int main(int argc, char *argv[]) {
111+
char consumer_message[MSG_SIZE];
112+
char recv_buffer[RECV_BUFF_SIZE];
113+
int producer_socket = -1;
114+
int ret = -1;
115+
116+
if (argc < 2) {
117+
fprintf(stderr, "usage: %s port\n", argv[0]);
118+
return -1;
119+
}
120+
121+
int port = atoi(argv[1]);
122+
123+
// zero the consumer_message buffer
124+
memset(consumer_message, 0, sizeof(consumer_message));
125+
126+
umf_memory_provider_handle_t OS_memory_provider = NULL;
127+
umf_os_memory_provider_params_t os_params;
128+
enum umf_result_t umf_result;
129+
130+
os_params = umfOsMemoryProviderParamsDefault();
131+
os_params.visibility = UMF_MEM_MAP_SHARED;
132+
133+
// create OS memory provider
134+
umf_result = umfMemoryProviderCreate(umfOsMemoryProviderOps(), &os_params,
135+
&OS_memory_provider);
136+
if (umf_result != UMF_RESULT_SUCCESS) {
137+
fprintf(stderr,
138+
"[consumer] ERROR: creating OS memory provider failed\n");
139+
return -1;
140+
}
141+
142+
// get the size of the IPC handle
143+
size_t IPC_handle_size;
144+
umf_result =
145+
umfMemoryProviderGetIPCHandleSize(OS_memory_provider, &IPC_handle_size);
146+
if (umf_result != UMF_RESULT_SUCCESS) {
147+
fprintf(stderr,
148+
"[consumer] ERROR: getting size of the IPC handle failed\n");
149+
goto err_umfMemoryProviderDestroy;
150+
}
151+
152+
producer_socket = consumer_connect(port);
153+
if (producer_socket < 0) {
154+
goto err_umfMemoryProviderDestroy;
155+
}
156+
157+
// zero the receive buffer
158+
memset(recv_buffer, 0, RECV_BUFF_SIZE);
159+
160+
// receive a producer's message
161+
ssize_t len = recv(producer_socket, recv_buffer, RECV_BUFF_SIZE, 0);
162+
if (len < 0) {
163+
fprintf(stderr, "[consumer] ERROR: recv() failed\n");
164+
goto err_close_producer_socket;
165+
}
166+
if (len != IPC_handle_size) {
167+
fprintf(stderr,
168+
"[consumer] ERROR: recv() received a wrong number of bytes "
169+
"(%zi != %zu expected)\n",
170+
len, IPC_handle_size);
171+
goto err_close_producer_socket;
172+
}
173+
174+
void *IPC_handle = recv_buffer;
175+
176+
fprintf(
177+
stderr,
178+
"[consumer] Received the IPC handle from the producer (%zi bytes)\n",
179+
len);
180+
181+
void *SHM_ptr;
182+
umf_result = umfMemoryProviderOpenIPCHandle(OS_memory_provider, IPC_handle,
183+
&SHM_ptr);
184+
if (umf_result == UMF_RESULT_ERROR_NOT_SUPPORTED) {
185+
fprintf(stderr,
186+
"[consumer] SKIP: opening the IPC handle is not supported\n");
187+
ret = 1; // SKIP
188+
189+
// write the SKIP response to the consumer_message buffer
190+
strcpy(consumer_message, "SKIP");
191+
192+
// send the SKIP response to the producer
193+
send(producer_socket, consumer_message, strlen(consumer_message) + 1,
194+
0);
195+
196+
goto err_close_producer_socket;
197+
}
198+
if (umf_result != UMF_RESULT_SUCCESS) {
199+
fprintf(stderr, "[consumer] ERROR: opening the IPC handle failed\n");
200+
goto err_close_producer_socket;
201+
}
202+
203+
fprintf(stderr,
204+
"[consumer] Opened the IPC handle received from the producer\n");
205+
206+
// read the current value from the shared memory
207+
unsigned long long SHM_number_1 = *(unsigned long long *)SHM_ptr;
208+
fprintf(
209+
stderr,
210+
"[consumer] Read the number from the producer's shared memory: %llu\n",
211+
SHM_number_1);
212+
213+
// calculate the new value
214+
unsigned long long SHM_number_2 = SHM_number_1 / 2;
215+
216+
// write the new number directly to the producer's shared memory
217+
*(unsigned long long *)SHM_ptr = SHM_number_2;
218+
fprintf(stderr,
219+
"[consumer] Wrote a new number directly to the producer's shared "
220+
"memory: %llu\n",
221+
SHM_number_2);
222+
223+
// write the response to the consumer_message buffer
224+
strcpy(consumer_message, CONSUMER_MSG);
225+
226+
// send response to the producer
227+
if (send(producer_socket, consumer_message, strlen(consumer_message) + 1,
228+
0) < 0) {
229+
fprintf(stderr, "[consumer] ERROR: send() failed\n");
230+
goto err_closeIPCHandle;
231+
}
232+
233+
fprintf(stderr, "[consumer] Sent a response message to the producer\n");
234+
235+
ret = 0; // SUCCESS
236+
237+
err_closeIPCHandle:
238+
// we do not know the exact size of the remote shared memory
239+
umf_result = umfMemoryProviderCloseIPCHandle(OS_memory_provider, SHM_ptr,
240+
sizeof(unsigned long long));
241+
if (umf_result != UMF_RESULT_SUCCESS) {
242+
fprintf(stderr, "[consumer] ERROR: closing the IPC handle failed\n");
243+
}
244+
245+
fprintf(stderr,
246+
"[consumer] Closed the IPC handle received from the producer\n");
247+
248+
err_close_producer_socket:
249+
close(producer_socket);
250+
251+
err_umfMemoryProviderDestroy:
252+
umfMemoryProviderDestroy(OS_memory_provider);
253+
254+
if (ret == 0) {
255+
fprintf(stderr, "[consumer] Shutting down (status OK) ...\n");
256+
} else if (ret == 1) {
257+
fprintf(stderr, "[consumer] Shutting down (status SKIP) ...\n");
258+
ret = 0;
259+
} else {
260+
fprintf(stderr, "[consumer] Shutting down (status ERROR) ...\n");
261+
}
262+
263+
return ret;
264+
}

0 commit comments

Comments
 (0)