Skip to content

Commit 2ce3377

Browse files
mc-nvindrajit96
andauthored
feat: Add GRPC error codes to GRPC streaming if enabled by user. (#7499) (#7555)
Co-authored-by: Indrajit Bhosale <iamindrajitb@gmail.com>
1 parent a716d6d commit 2ce3377

File tree

12 files changed

+440
-20
lines changed

12 files changed

+440
-20
lines changed

Dockerfile.QA

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,12 @@ RUN cp -r qa/L0_decoupled/models qa/L0_decoupled/python_models/ && \
267267
cp /workspace/tritonbuild/python/examples/decoupled/square_config.pbtxt \
268268
qa/L0_decoupled/python_models/square_int32/.
269269

270+
RUN mkdir -p qa/L0_decoupled_grpc_error && \
271+
cp -r qa/L0_decoupled/. qa/L0_decoupled_grpc_error
272+
273+
RUN mkdir -p qa/L0_grpc_error_state_cleanup && \
274+
cp -r qa/L0_grpc_state_cleanup/. qa/L0_grpc_error_state_cleanup
275+
270276
RUN mkdir -p qa/L0_repoagent_checksum/models/identity_int32/1 && \
271277
cp tritonbuild/identity/install/backends/identity/libtriton_identity.so \
272278
qa/L0_repoagent_checksum/models/identity_int32/1/.

docs/customization_guide/inference_protocols.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,16 @@ These options can be used to configure the KeepAlive settings:
115115

116116
For client-side documentation, see [Client-Side GRPC KeepAlive](https://github.com/triton-inference-server/client/blob/main/README.md#grpc-keepalive).
117117

118+
#### GRPC Status Codes
119+
120+
Triton implements GRPC error handling for streaming requests when a specific flag is enabled through headers. Upon encountering an error, Triton returns the appropriate GRPC error code and subsequently closes the stream.
121+
122+
* `triton_grpc_error` : The header value needs to be set to true while starting the stream.
123+
124+
GRPC status codes can be used for better visibility and monitoring. For more details, see [gRPC Status Codes](https://grpc.io/docs/guides/status-codes/)
125+
126+
For client-side documentation, see [Client-Side GRPC Status Codes](https://github.com/triton-inference-server/client/tree/main#GRPC-Status-Codes)
127+
118128
### Limit Endpoint Access (BETA)
119129

120130
Triton users may want to restrict access to protocols or APIs that are

qa/L0_backend_python/lifecycle/lifecycle_test.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
sys.path.append("../../common")
3636

3737
import queue
38+
import threading
3839
import time
3940
import unittest
4041
from functools import partial
@@ -241,6 +242,135 @@ def test_infer_pymodel_error(self):
241242
initial_metrics_value,
242243
)
243244

245+
# Test grpc stream behavior when triton_grpc_error is set to true.
246+
# Expected to close stream and return GRPC error when model returns error.
247+
def test_triton_grpc_error_error_on(self):
248+
model_name = "execute_grpc_error"
249+
shape = [2, 2]
250+
number_of_requests = 2
251+
user_data = UserData()
252+
triton_client = grpcclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8001")
253+
metadata = {"triton_grpc_error": "true"}
254+
triton_client.start_stream(
255+
callback=partial(callback, user_data), headers=metadata
256+
)
257+
stream_end = False
258+
for i in range(number_of_requests):
259+
input_data = np.random.randn(*shape).astype(np.float32)
260+
inputs = [
261+
grpcclient.InferInput(
262+
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
263+
)
264+
]
265+
inputs[0].set_data_from_numpy(input_data)
266+
try:
267+
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
268+
result = user_data._completed_requests.get()
269+
if type(result) == InferenceServerException:
270+
# execute_grpc_error intentionally returns error with StatusCode.INTERNAL status on 2nd request
271+
self.assertEqual(str(result.status()), "StatusCode.INTERNAL")
272+
stream_end = True
273+
else:
274+
# Stream is not killed
275+
output_data = result.as_numpy("OUT")
276+
self.assertIsNotNone(output_data, "error: expected 'OUT'")
277+
except Exception as e:
278+
if stream_end == True:
279+
# We expect the stream to have closed
280+
self.assertTrue(
281+
True,
282+
"This should always pass as cancellation should succeed",
283+
)
284+
else:
285+
self.assertFalse(
286+
True, "Unexpected Stream killed without Error from CORE"
287+
)
288+
289+
# Test grpc stream behavior when triton_grpc_error is set to true in multiple open streams.
290+
# Expected to close stream and return GRPC error when model returns error.
291+
def test_triton_grpc_error_multithreaded(self):
292+
thread1 = threading.Thread(target=self.test_triton_grpc_error_error_on)
293+
thread2 = threading.Thread(target=self.test_triton_grpc_error_error_on)
294+
# Start the threads
295+
thread1.start()
296+
thread2.start()
297+
# Wait for both threads to finish
298+
thread1.join()
299+
thread2.join()
300+
301+
# Test grpc stream behavior when triton_grpc_error is set to true and subsequent stream is cancelled.
302+
# Expected cancellation is successful.
303+
def test_triton_grpc_error_cancel(self):
304+
model_name = "execute_grpc_error"
305+
shape = [2, 2]
306+
number_of_requests = 1
307+
user_data = UserData()
308+
triton_server_url = "localhost:8001" # Replace with your Triton server address
309+
stream_end = False
310+
triton_client = grpcclient.InferenceServerClient(triton_server_url)
311+
312+
metadata = {"triton_grpc_error": "true"}
313+
314+
triton_client.start_stream(
315+
callback=partial(callback, user_data), headers=metadata
316+
)
317+
318+
for i in range(number_of_requests):
319+
input_data = np.random.randn(*shape).astype(np.float32)
320+
inputs = [
321+
grpcclient.InferInput(
322+
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
323+
)
324+
]
325+
inputs[0].set_data_from_numpy(input_data)
326+
try:
327+
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
328+
result = user_data._completed_requests.get()
329+
if type(result) == InferenceServerException:
330+
stream_end = True
331+
if i == 0:
332+
triton_client.stop_stream(cancel_requests=True)
333+
except Exception as e:
334+
if stream_end == True:
335+
# We expect the stream to have closed
336+
self.assertTrue(
337+
True,
338+
"This should always pass as cancellation should succeed",
339+
)
340+
else:
341+
self.assertFalse(
342+
True, "Unexpected Stream killed without Error from CORE"
343+
)
344+
self.assertTrue(
345+
True,
346+
"This should always pass as cancellation should succeed without any exception",
347+
)
348+
349+
# Test grpc stream behavior when triton_grpc_error is set to false
350+
# and subsequent stream is NOT closed when error is reported from CORE
351+
def test_triton_grpc_error_error_off(self):
352+
model_name = "execute_grpc_error"
353+
shape = [2, 2]
354+
number_of_requests = 4
355+
response_counter = 0
356+
user_data = UserData()
357+
triton_client = grpcclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8001")
358+
triton_client.start_stream(callback=partial(callback, user_data))
359+
for i in range(number_of_requests):
360+
input_data = np.random.randn(*shape).astype(np.float32)
361+
inputs = [
362+
grpcclient.InferInput(
363+
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
364+
)
365+
]
366+
inputs[0].set_data_from_numpy(input_data)
367+
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
368+
_ = user_data._completed_requests.get()
369+
response_counter += 1
370+
# we expect response_counter == number_of_requests,
371+
# which indicates that after the first reported grpc error stream did NOT close and mode != triton_grpc_error
372+
self.assertEqual(response_counter, number_of_requests)
373+
244374

245375
if __name__ == "__main__":
246376
unittest.main()

qa/L0_backend_python/lifecycle/test.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ cp ../../python_models/execute_error/config.pbtxt ./models/execute_error/
5252
sed -i "s/^max_batch_size:.*/max_batch_size: 8/" config.pbtxt && \
5353
echo "dynamic_batching { preferred_batch_size: [8], max_queue_delay_microseconds: 12000000 }" >> config.pbtxt)
5454

55+
mkdir -p models/execute_grpc_error/1/
56+
cp ../../python_models/execute_grpc_error/model.py ./models/execute_grpc_error/1/
57+
cp ../../python_models/execute_grpc_error/config.pbtxt ./models/execute_grpc_error/
58+
(cd models/execute_grpc_error && \
59+
sed -i "s/^name:.*/name: \"execute_grpc_error\"/" config.pbtxt && \
60+
sed -i "s/^max_batch_size:.*/max_batch_size: 8/" config.pbtxt && \
61+
echo "dynamic_batching { preferred_batch_size: [8], max_queue_delay_microseconds: 1200000 }" >> config.pbtxt)
62+
5563
mkdir -p models/execute_return_error/1/
5664
cp ../../python_models/execute_return_error/model.py ./models/execute_return_error/1/
5765
cp ../../python_models/execute_return_error/config.pbtxt ./models/execute_return_error/

qa/L0_decoupled/decoupled_test.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,13 @@ def _stream_infer_with_params(
116116
url="localhost:8001", verbose=True
117117
) as triton_client:
118118
# Establish stream
119-
triton_client.start_stream(callback=partial(callback, user_data))
119+
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
120+
metadata = {"triton_grpc_error": "true"}
121+
triton_client.start_stream(
122+
callback=partial(callback, user_data), headers=metadata
123+
)
124+
else:
125+
triton_client.start_stream(callback=partial(callback, user_data))
120126
# Send specified many requests in parallel
121127
for i in range(request_count):
122128
time.sleep((request_delay / 1000))
@@ -175,7 +181,13 @@ def _stream_infer(
175181
url="localhost:8001", verbose=True
176182
) as triton_client:
177183
# Establish stream
178-
triton_client.start_stream(callback=partial(callback, user_data))
184+
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
185+
metadata = {"triton_grpc_error": "true"}
186+
triton_client.start_stream(
187+
callback=partial(callback, user_data), headers=metadata
188+
)
189+
else:
190+
triton_client.start_stream(callback=partial(callback, user_data))
179191
# Send specified many requests in parallel
180192
for i in range(request_count):
181193
time.sleep((request_delay / 1000))

qa/L0_decoupled/test.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,4 @@ else
176176
echo -e "\n***\n*** Test Failed\n***"
177177
fi
178178

179-
exit $RET
179+
exit $RET

qa/L0_grpc_state_cleanup/cleanup_test.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,17 @@ def _stream_infer_with_params(
161161
url="localhost:8001", verbose=True
162162
) as triton_client:
163163
# Establish stream
164-
triton_client.start_stream(
165-
callback=partial(callback, user_data), stream_timeout=stream_timeout
166-
)
164+
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
165+
metadata = {"triton_grpc_error": "true"}
166+
triton_client.start_stream(
167+
callback=partial(callback, user_data),
168+
stream_timeout=stream_timeout,
169+
headers=metadata,
170+
)
171+
else:
172+
triton_client.start_stream(
173+
callback=partial(callback, user_data), stream_timeout=stream_timeout
174+
)
167175
# Send specified many requests in parallel
168176
for i in range(request_count):
169177
time.sleep((request_delay / 1000))
@@ -229,9 +237,17 @@ def _stream_infer(
229237
url="localhost:8001", verbose=True
230238
) as triton_client:
231239
# Establish stream
232-
triton_client.start_stream(
233-
callback=partial(callback, user_data), stream_timeout=stream_timeout
234-
)
240+
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
241+
metadata = {"triton_grpc_error": "true"}
242+
triton_client.start_stream(
243+
callback=partial(callback, user_data),
244+
stream_timeout=stream_timeout,
245+
headers=metadata,
246+
)
247+
else:
248+
triton_client.start_stream(
249+
callback=partial(callback, user_data), stream_timeout=stream_timeout
250+
)
235251
# Send specified many requests in parallel
236252
for i in range(request_count):
237253
time.sleep((request_delay / 1000))
@@ -608,9 +624,17 @@ def test_non_decoupled_streaming_multi_response(self):
608624
url="localhost:8001", verbose=True
609625
) as client:
610626
# Establish stream
611-
client.start_stream(
612-
callback=partial(callback, user_data), stream_timeout=16
613-
)
627+
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
628+
metadata = {"triton_grpc_error": "true"}
629+
client.start_stream(
630+
callback=partial(callback, user_data),
631+
stream_timeout=16,
632+
headers=metadata,
633+
)
634+
else:
635+
client.start_stream(
636+
callback=partial(callback, user_data), stream_timeout=16
637+
)
614638
# Send a request
615639
client.async_stream_infer(
616640
model_name=self.repeat_non_decoupled_model_name,
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
2+
#
3+
# Redistribution and use in source and binary forms, with or without
4+
# modification, are permitted provided that the following conditions
5+
# are met:
6+
# * Redistributions of source code must retain the above copyright
7+
# notice, this list of conditions and the following disclaimer.
8+
# * Redistributions in binary form must reproduce the above copyright
9+
# notice, this list of conditions and the following disclaimer in the
10+
# documentation and/or other materials provided with the distribution.
11+
# * Neither the name of NVIDIA CORPORATION nor the names of its
12+
# contributors may be used to endorse or promote products derived
13+
# from this software without specific prior written permission.
14+
#
15+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
16+
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18+
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
19+
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20+
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21+
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22+
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
23+
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24+
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26+
27+
backend: "python"
28+
max_batch_size: 64
29+
30+
input [
31+
{
32+
name: "IN"
33+
data_type: TYPE_FP32
34+
dims: [ -1 ]
35+
}
36+
]
37+
38+
output [
39+
{
40+
name: "OUT"
41+
data_type: TYPE_FP32
42+
dims: [ -1 ]
43+
}
44+
]
45+
46+
instance_group [
47+
{
48+
count: 1
49+
kind : KIND_CPU
50+
}
51+
]
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
#
3+
# Redistribution and use in source and binary forms, with or without
4+
# modification, are permitted provided that the following conditions
5+
# are met:
6+
# * Redistributions of source code must retain the above copyright
7+
# notice, this list of conditions and the following disclaimer.
8+
# * Redistributions in binary form must reproduce the above copyright
9+
# notice, this list of conditions and the following disclaimer in the
10+
# documentation and/or other materials provided with the distribution.
11+
# * Neither the name of NVIDIA CORPORATION nor the names of its
12+
# contributors may be used to endorse or promote products derived
13+
# from this software without specific prior written permission.
14+
#
15+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
16+
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18+
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
19+
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20+
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21+
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22+
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
23+
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24+
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26+
27+
import triton_python_backend_utils as pb_utils
28+
29+
30+
class TritonPythonModel:
31+
def __init__(self):
32+
# Maintain total inference count, so as to return error on 2nd request, all of this to simulate model failure
33+
self.inf_count = 1
34+
35+
def execute(self, requests):
36+
"""This function is called on inference request."""
37+
responses = []
38+
39+
# Generate the error for the second request
40+
for request in requests:
41+
input_tensor = pb_utils.get_input_tensor_by_name(request, "IN")
42+
out_tensor = pb_utils.Tensor("OUT", input_tensor.as_numpy())
43+
if self.inf_count % 2:
44+
# Every odd request is success
45+
responses.append(pb_utils.InferenceResponse([out_tensor]))
46+
else:
47+
# Every even request is failure
48+
error = pb_utils.TritonError("An error occurred during execution")
49+
responses.append(pb_utils.InferenceResponse([out_tensor], error))
50+
self.inf_count += 1
51+
52+
return responses

0 commit comments

Comments
 (0)