Skip to content

Commit 4df3160

Browse files
authored
fix: gRPC segfault due to Low Request Cancellation Timeout (#7840) (#7880)
1 parent 59c1842 commit 4df3160

File tree

7 files changed

+202
-67
lines changed

7 files changed

+202
-67
lines changed

qa/L0_grpc_state_cleanup/test.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ function check_state_release() {
5656
num_state_new=`cat $log_file | grep "StateNew" | wc -l`
5757

5858
if [ $num_state_release -ne $num_state_new ]; then
59-
cat $log_file
6059
echo -e "\n***\n*** Test Failed: Mismatch detected, $num_state_new state(s) created, $num_state_release state(s) released. \n***" >> $log_file
6160
return 1
6261
fi

qa/L0_request_cancellation/grpc_cancellation_test.py

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22

3-
# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
# Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
44
#
55
# Redistribution and use in source and binary forms, with or without
66
# modification, are permitted provided that the following conditions
@@ -27,6 +27,7 @@
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828

2929
import asyncio
30+
import os
3031
import queue
3132
import re
3233
import time
@@ -169,28 +170,61 @@ def test_grpc_async_infer_cancellation_at_step_start(self):
169170
with open(server_log_name, "r") as f:
170171
server_log = f.read()
171172

172-
cancel_at_start_count = len(
173-
re.findall(
174-
r"Cancellation notification received for ModelInferHandler, rpc_ok=1, context \d+, \d+ step START",
175-
server_log,
176-
)
177-
)
178173
cur_new_req_handl_count = len(
179174
re.findall("New request handler for ModelInferHandler", server_log)
180175
)
181-
self.assertEqual(
182-
cancel_at_start_count,
183-
2,
184-
"Expected 2 cancellation at step START log entries, but got {}".format(
185-
cancel_at_start_count
186-
),
187-
)
188176
self.assertGreater(
189177
cur_new_req_handl_count,
190178
prev_new_req_handl_count,
191179
"gRPC Cancellation on step START Test Failed: New request handler for ModelInferHandler was not created",
192180
)
193181

182+
def test_grpc_async_infer_response_complete_during_cancellation(self):
183+
# long test
184+
self.test_duration_delta = 2
185+
delay_notification_sec = (
186+
int(os.getenv("TRITONSERVER_DELAY_GRPC_NOTIFICATION")) / 1000
187+
)
188+
delay_queue_cancellation_sec = (
189+
int(os.getenv("TRITONSERVER_DELAY_GRPC_ENQUEUE")) / 1000
190+
)
191+
future = self._client.async_infer(
192+
model_name=self._model_name,
193+
inputs=self._inputs,
194+
callback=self._callback,
195+
outputs=self._outputs,
196+
)
197+
# ensure cancellation is received before InferResponseComplete and is processed after InferResponseComplete
198+
time.sleep(self._model_delay - 2)
199+
future.cancel()
200+
time.sleep(
201+
delay_notification_sec + delay_queue_cancellation_sec
202+
) # ensure the cancellation is processed
203+
self._assert_callback_cancelled()
204+
205+
def test_grpc_async_infer_cancellation_during_response_complete(self):
206+
# long test
207+
self.test_duration_delta = 2.5
208+
delay_notification_sec = (
209+
int(os.getenv("TRITONSERVER_DELAY_GRPC_NOTIFICATION")) / 1000
210+
)
211+
delay_response_completion_sec = (
212+
int(os.getenv("TRITONSERVER_DELAY_RESPONSE_COMPLETION")) / 1000
213+
)
214+
future = self._client.async_infer(
215+
model_name=self._model_name,
216+
inputs=self._inputs,
217+
callback=self._callback,
218+
outputs=self._outputs,
219+
)
220+
# ensure the cancellation is received between InferResponseComplete checking cancellation and Finish
221+
time.sleep(self._model_delay + 2)
222+
future.cancel()
223+
time.sleep(
224+
delay_notification_sec + delay_response_completion_sec
225+
) # ensure the cancellation is processed
226+
self._assert_callback_cancelled()
227+
194228

195229
if __name__ == "__main__":
196230
unittest.main()

qa/L0_request_cancellation/test.sh

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ mkdir -p models/model/1 && (cd models/model && \
5858
echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt)
5959

6060
SERVER_LOG=server.log
61-
LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH ./request_cancellation_test > $SERVER_LOG
61+
LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH ./request_cancellation_test > $SERVER_LOG 2>&1
6262
if [ $? -ne 0 ]; then
6363
echo -e "\n***\n*** Unit Tests Failed\n***"
6464
cat $SERVER_LOG
@@ -78,12 +78,23 @@ mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \
7878
echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt && \
7979
echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "10000" } }]' >> config.pbtxt)
8080

81-
for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc_async_infer" "test_aio_grpc_stream_infer" "test_grpc_async_infer_cancellation_at_step_start"; do
82-
81+
for TEST_CASE in "test_grpc_async_infer" \
82+
"test_grpc_stream_infer" \
83+
"test_aio_grpc_async_infer" \
84+
"test_aio_grpc_stream_infer" \
85+
"test_grpc_async_infer_cancellation_at_step_start" \
86+
"test_grpc_async_infer_response_complete_during_cancellation" \
87+
"test_grpc_async_infer_cancellation_during_response_complete"; do
8388
TEST_LOG="./grpc_cancellation_test.$TEST_CASE.log"
8489
SERVER_LOG="grpc_cancellation_test.$TEST_CASE.server.log"
8590
if [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_at_step_start" ]; then
8691
export TRITONSERVER_DELAY_GRPC_PROCESS=5000
92+
elif [ "$TEST_CASE" == "test_grpc_async_infer_response_complete_during_cancellation" ]; then
93+
export TRITONSERVER_DELAY_GRPC_NOTIFICATION=5000
94+
export TRITONSERVER_DELAY_GRPC_ENQUEUE=5000
95+
elif [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_during_response_complete" ]; then
96+
export TRITONSERVER_DELAY_GRPC_NOTIFICATION=5000
97+
export TRITONSERVER_DELAY_RESPONSE_COMPLETION=5000
8798
fi
8899

89100
SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=1"
@@ -101,11 +112,16 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc
101112
cat $TEST_LOG
102113
RET=1
103114
fi
104-
grep "Cancellation notification received for" $SERVER_LOG
105-
if [ $? -ne 0 ]; then
115+
116+
count=$(grep -o "Cancellation notification received for" $SERVER_LOG | wc -l)
117+
if [ $count == 0 ]; then
106118
echo -e "\n***\n*** Cancellation not received by server on $TEST_CASE\n***"
107119
cat $SERVER_LOG
108120
RET=1
121+
elif [ $count -ne 1 ]; then
122+
echo -e "\n***\n*** Unexpected cancellation received by server on $TEST_CASE. Expected 1 but received $count.\n***"
123+
cat $SERVER_LOG
124+
RET=1
109125
fi
110126
set -e
111127

@@ -114,6 +130,12 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc
114130

115131
if [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_at_step_start" ]; then
116132
unset TRITONSERVER_DELAY_GRPC_PROCESS
133+
elif [ "$TEST_CASE" == "test_grpc_async_infer_response_complete_during_cancellation" ]; then
134+
unset TRITONSERVER_DELAY_GRPC_NOTIFICATION
135+
unset TRITONSERVER_DELAY_GRPC_ENQUEUE
136+
elif [ "$TEST_CASE" == "test_grpc_async_infer_cancellation_during_response_complete" ]; then
137+
unset TRITONSERVER_DELAY_GRPC_NOTIFICATION
138+
unset TRITONSERVER_DELAY_RESPONSE_COMPLETION
117139
fi
118140
done
119141

src/grpc/infer_handler.cc

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,8 @@ ModelInferHandler::StartNewRequest()
677677
}
678678

679679
bool
680-
ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
680+
ModelInferHandler::Process(
681+
InferHandler::State* state, bool rpc_ok, bool is_notification)
681682
{
682683
// There are multiple handlers registered in the gRPC service.
683684
// Hence, there we can have a case where a handler thread is
@@ -690,8 +691,8 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
690691
// Will delay the Process execution by the specified time.
691692
// This can be used to test the flow when cancellation request
692693
// issued for the request, which is still at START step.
693-
LOG_INFO << "Delaying the write of the response by "
694-
<< state->delay_process_ms_ << " ms...";
694+
LOG_INFO << "Delaying the Process execution by " << state->delay_process_ms_
695+
<< " ms...";
695696
std::this_thread::sleep_for(
696697
std::chrono::milliseconds(state->delay_process_ms_));
697698
}
@@ -711,11 +712,12 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
711712
// because we will never leave this if body. Refer to PR 7325.
712713
// This is a special case for ModelInferHandler, since we have 2 threads,
713714
// and each of them can process cancellation. ModelStreamInfer has only 1
714-
// thread, and cancellation at step START was not reproducible in a
715+
// thread, and cancellation at step START was not reproducible in a
715716
// single thread scenario.
716717
StartNewRequest();
717718
}
718-
bool resume = state->context_->HandleCancellation(state, rpc_ok, Name());
719+
bool resume = state->context_->HandleCancellation(
720+
state, rpc_ok, Name(), is_notification);
719721
return resume;
720722
}
721723

@@ -765,7 +767,7 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
765767
std::make_pair("GRPC_SEND_START", TraceManager::CaptureTimestamp()));
766768
#endif // TRITON_ENABLE_TRACING
767769

768-
state->step_ = COMPLETE;
770+
state->step_ = Steps::COMPLETE;
769771
state->context_->responder_->Finish(
770772
inference::ModelInferResponse(), status, state);
771773
}
@@ -1001,7 +1003,7 @@ ModelInferHandler::Execute(InferHandler::State* state)
10011003
}
10021004
#endif // TRITON_ENABLE_TRACING
10031005

1004-
state->step_ = COMPLETE;
1006+
state->step_ = Steps::COMPLETE;
10051007
state->context_->responder_->Finish(error_response, status, state);
10061008
}
10071009
}
@@ -1051,6 +1053,17 @@ ModelInferHandler::InferResponseComplete(
10511053
<< ", skipping response generation as grpc transaction was "
10521054
"cancelled... ";
10531055

1056+
if (state->delay_enqueue_ms_ != 0) {
1057+
// Will delay PutTaskBackToQueue by the specified time.
1058+
// This can be used to test the flow when cancellation request
1059+
// issued for the request during InferResponseComplete
1060+
// callback right before Process in the notification thread.
1061+
LOG_INFO << "Delaying PutTaskBackToQueue by " << state->delay_enqueue_ms_
1062+
<< " ms...";
1063+
std::this_thread::sleep_for(
1064+
std::chrono::milliseconds(state->delay_enqueue_ms_));
1065+
}
1066+
10541067
// Send state back to the queue so that state can be released
10551068
// in the next cycle.
10561069
state->context_->PutTaskBackToQueue(state);
@@ -1113,7 +1126,17 @@ ModelInferHandler::InferResponseComplete(
11131126
std::make_pair("GRPC_SEND_START", TraceManager::CaptureTimestamp()));
11141127
#endif // TRITON_ENABLE_TRACING
11151128

1116-
state->step_ = COMPLETE;
1129+
if (state->delay_response_completion_ms_ != 0) {
1130+
// Will delay the Process execution of state at step COMPLETE by the
1131+
// specified time. This can be used to test the flow when cancellation
1132+
// request issued for the request, which is at InferResponseComplete.
1133+
LOG_INFO << "Delaying InferResponseComplete by "
1134+
<< state->delay_response_completion_ms_ << " ms...";
1135+
std::this_thread::sleep_for(
1136+
std::chrono::milliseconds(state->delay_response_completion_ms_));
1137+
}
1138+
1139+
state->step_ = Steps::COMPLETE;
11171140
state->context_->responder_->Finish(*response, state->status_, state);
11181141
if (response_created) {
11191142
delete response;

0 commit comments

Comments
 (0)