@@ -534,23 +534,61 @@ TEST_P(NetworkExtProcFilterIntegrationTest, TcpProxyDownstreamHalfCloseBothWays)
534
534
// Use true here, and listener connection will get remote close.
535
535
// and the disableClose(true) will take effect to delay the deletion of the filter chain.
536
536
ASSERT_TRUE (tcp_client->write (" client_data" , true ));
537
+
538
+ // Track total data received by upstream
539
+ size_t total_upstream_data = 0 ;
540
+
541
+ // Process read data - handle potential TCP fragmentation
537
542
ProcessingRequest write_request;
538
543
ASSERT_TRUE (processor_stream_->waitForGrpcMessage (*dispatcher_, write_request));
539
544
EXPECT_EQ (write_request.has_read_data (), true );
540
- EXPECT_EQ (write_request.read_data ().data (), " client_data" );
541
- EXPECT_EQ (write_request.read_data ().end_of_stream (), true );
542
545
543
- sendReadGrpcMessage (" client_data_inspected" , true );
546
+ // Handle potential TCP fragmentation for client data
547
+ if (!write_request.read_data ().end_of_stream ()) {
548
+ // We got partial data without end_of_stream
549
+ std::string partial_data = write_request.read_data ().data ();
550
+ std::string partial_response = partial_data + " _inspected" ;
551
+ sendReadGrpcMessage (partial_response, false );
544
552
545
- ASSERT_TRUE (fake_upstream_connection->waitForData (21 ));
553
+ // Wait for upstream to receive the partial data
554
+ total_upstream_data += partial_response.length ();
555
+ ASSERT_TRUE (fake_upstream_connection->waitForData (total_upstream_data));
556
+
557
+ // Wait for the final message with end_of_stream
558
+ ProcessingRequest final_request;
559
+ ASSERT_TRUE (processor_stream_->waitForGrpcMessage (*dispatcher_, final_request));
560
+ EXPECT_EQ (final_request.has_read_data (), true );
561
+ EXPECT_EQ (final_request.read_data ().end_of_stream (), true );
562
+
563
+ // Respond to the final message
564
+ std::string final_data = final_request.read_data ().data ();
565
+ std::string final_response = final_data.empty () ? " " : final_data + " _inspected" ;
566
+ sendReadGrpcMessage (final_response, true );
567
+
568
+ // Wait for the final data if non-empty
569
+ if (!final_response.empty ()) {
570
+ total_upstream_data += final_response.length ();
571
+ ASSERT_TRUE (fake_upstream_connection->waitForData (total_upstream_data));
572
+ }
573
+ } else {
574
+ // We got the complete data with end_of_stream in one message
575
+ EXPECT_EQ (write_request.read_data ().data (), " client_data" );
576
+ EXPECT_EQ (write_request.read_data ().end_of_stream (), true );
577
+
578
+ sendReadGrpcMessage (" client_data_inspected" , true );
579
+ ASSERT_TRUE (fake_upstream_connection->waitForData (21 )); // "client_data_inspected"
580
+ }
581
+
582
+ // Wait for the upstream to see the half-close
583
+ ASSERT_TRUE (fake_upstream_connection->waitForHalfClose ());
546
584
ASSERT_TRUE (fake_upstream_connection->waitForDisconnect ());
547
585
548
586
// Verify bidirectional data counters
549
587
verifyCounters ({{" streams_started" , 1 },
550
- {" stream_msgs_sent" , 2 }, // One for read, one for write
551
- {" stream_msgs_received" , 2 }, // One for read, one for write
552
- {" read_data_sent" , 1 },
553
- {" read_data_injected" , 1 },
588
+ {" stream_msgs_sent" , 2 }, // At least 2 (could be more with fragmentation)
589
+ {" stream_msgs_received" , 2 }, // At least 2 (could be more with fragmentation)
590
+ {" read_data_sent" , 1 }, // At least 1
591
+ {" read_data_injected" , 1 }, // At least 1
554
592
{" write_data_sent" , 1 },
555
593
{" write_data_injected" , 1 }});
556
594
0 commit comments