15
15
#include "ipc_common.h"
16
16
17
17
#define INET_ADDR "127.0.0.1"
18
- #define MSG_SIZE 1024
18
+ #define MSG_SIZE 1024 * 8
19
19
20
20
// consumer's response message
21
21
#define CONSUMER_MSG \
@@ -33,6 +33,10 @@ Generally communication between the producer and the consumer looks like:
33
33
- Producer creates a socket
34
34
- Producer connects to the consumer
35
35
- Consumer connects at IP 127.0.0.1 and a port to the producer
36
+ - Producer sends the IPC handle size to the consumer
37
+ - Consumer receives the IPC handle size from the producer
38
+ - Consumer sends the confirmation (IPC handle size) to the producer
39
+ - Producer receives the confirmation (IPC handle size) from the consumer
36
40
- Producer sends the IPC handle to the consumer
37
41
- Consumer receives the IPC handle from the producer
38
42
- Consumer opens the IPC handle received from the producer
@@ -127,29 +131,36 @@ int run_consumer(int port, umf_memory_provider_ops_t *provider_ops,
127
131
return -1 ;
128
132
}
129
133
130
- // get the size of the IPC handle
131
- size_t IPC_handle_size ;
132
- umf_result = umfMemoryProviderGetIPCHandleSize (provider , & IPC_handle_size );
133
- if (umf_result != UMF_RESULT_SUCCESS ) {
134
- fprintf (stderr ,
135
- "[consumer] ERROR: getting size of the IPC handle failed\n" );
134
+ producer_socket = consumer_connect (port );
135
+ if (producer_socket < 0 ) {
136
136
goto err_umfMemoryProviderDestroy ;
137
137
}
138
138
139
139
// allocate the zeroed receive buffer
140
- char * recv_buffer = calloc (1 , IPC_handle_size );
140
+ char * recv_buffer = calloc (1 , MSG_SIZE );
141
141
if (!recv_buffer ) {
142
142
fprintf (stderr , "[consumer] ERROR: out of memory\n" );
143
143
goto err_umfMemoryProviderDestroy ;
144
144
}
145
145
146
- producer_socket = consumer_connect (port );
147
- if (producer_socket < 0 ) {
148
- goto err_umfMemoryProviderDestroy ;
146
+ // get the size of the IPC handle from the producer
147
+ size_t IPC_handle_size ;
148
+ ssize_t recv_len = recv (producer_socket , recv_buffer , MSG_SIZE , 0 );
149
+ if (recv_len < 0 ) {
150
+ fprintf (stderr , "[consumer] ERROR: recv() failed\n" );
151
+ goto err_close_producer_socket ;
149
152
}
153
+ IPC_handle_size = * (size_t * )recv_buffer ;
154
+ fprintf (stderr , "[consumer] Got the size of the IPC handle: %zu\n" ,
155
+ IPC_handle_size );
156
+
157
+ // send confirmation to the producer (IPC handle size)
158
+ send (producer_socket , & IPC_handle_size , sizeof (IPC_handle_size ), 0 );
159
+ fprintf (stderr ,
160
+ "[consumer] Send the confirmation (IPC handle size) to producer\n" );
150
161
151
- // receive a producer's message
152
- ssize_t recv_len = recv (producer_socket , recv_buffer , IPC_handle_size , 0 );
162
+ // receive IPC handle from the producer
163
+ recv_len = recv (producer_socket , recv_buffer , MSG_SIZE , 0 );
153
164
if (recv_len < 0 ) {
154
165
fprintf (stderr , "[consumer] ERROR: recv() failed\n" );
155
166
goto err_close_producer_socket ;
@@ -388,6 +399,44 @@ int run_producer(int port, umf_memory_provider_ops_t *provider_ops,
388
399
goto err_PutIPCHandle ;
389
400
}
390
401
402
+ // send the IPC_handle_size to the consumer
403
+ ssize_t len =
404
+ send (producer_socket , & IPC_handle_size , sizeof (IPC_handle_size ), 0 );
405
+ if (len < 0 ) {
406
+ fprintf (stderr , "[producer] ERROR: unable to send the message\n" );
407
+ goto err_close_producer_socket ;
408
+ }
409
+
410
+ fprintf (stderr ,
411
+ "[producer] Sent the size of the IPC handle (%zu) to the consumer "
412
+ "(sent %zu bytes)\n" ,
413
+ IPC_handle_size , len );
414
+
415
+ // zero the consumer_message buffer
416
+ memset (consumer_message , 0 , sizeof (consumer_message ));
417
+
418
+ // receive the consumer's confirmation - IPC handle size
419
+ len = recv (producer_socket , consumer_message , sizeof (consumer_message ), 0 );
420
+ if (len < 0 ) {
421
+ fprintf (stderr , "[producer] ERROR: error while receiving the "
422
+ "confirmation from the consumer\n" );
423
+ goto err_close_producer_socket ;
424
+ }
425
+
426
+ size_t conf_IPC_handle_size = * (size_t * )consumer_message ;
427
+ if (conf_IPC_handle_size == IPC_handle_size ) {
428
+ fprintf (stderr ,
429
+ "[producer] Received the correct confirmation (%zu) from the "
430
+ "consumer (%zu bytes)\n" ,
431
+ conf_IPC_handle_size , len );
432
+ } else {
433
+ fprintf (stderr ,
434
+ "[producer] Received an INCORRECT confirmation (%zu) from the "
435
+ "consumer (%zu bytes)\n" ,
436
+ conf_IPC_handle_size , len );
437
+ goto err_close_producer_socket ;
438
+ }
439
+
391
440
// send the IPC_handle of IPC_handle_size to the consumer
392
441
if (send (producer_socket , IPC_handle , IPC_handle_size , 0 ) < 0 ) {
393
442
fprintf (stderr , "[producer] ERROR: unable to send the message\n" );
0 commit comments