Skip to content

Commit 6c6df11

Browse files
test: Add tests cancelling BLS decoupled request in Python backend (#8097)
1 parent cbf4f41 commit 6c6df11

File tree

10 files changed

+824
-2
lines changed

10 files changed

+824
-2
lines changed

qa/L0_backend_python/decoupled/decoupled_test.py

Lines changed: 194 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22

3-
# Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
# Copyright 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
44
#
55
# Redistribution and use in source and binary forms, with or without
66
# modification, are permitted provided that the following conditions
@@ -46,6 +46,34 @@
4646
_tritonserver_ipaddr = os.environ.get("TRITONSERVER_IPADDR", "localhost")
4747

4848

49+
def prepare_decoupled_bls_cancel_inputs(input_value, max_sum_value, ignore_cancel):
50+
input_data = np.array([input_value], dtype=np.int32)
51+
max_sum_data = np.array([max_sum_value], dtype=np.int32)
52+
ignore_cancel_data = np.array([ignore_cancel], dtype=np.bool_)
53+
inputs = [
54+
grpcclient.InferInput(
55+
"INPUT",
56+
input_data.shape,
57+
np_to_triton_dtype(input_data.dtype),
58+
),
59+
grpcclient.InferInput(
60+
"MAX_SUM",
61+
max_sum_data.shape,
62+
np_to_triton_dtype(max_sum_data.dtype),
63+
),
64+
grpcclient.InferInput(
65+
"IGNORE_CANCEL",
66+
ignore_cancel_data.shape,
67+
np_to_triton_dtype(ignore_cancel_data.dtype),
68+
),
69+
]
70+
inputs[0].set_data_from_numpy(input_data)
71+
inputs[1].set_data_from_numpy(max_sum_data)
72+
inputs[2].set_data_from_numpy(ignore_cancel_data)
73+
74+
return inputs
75+
76+
4977
class UserData:
5078
def __init__(self):
5179
self._completed_requests = queue.Queue()
@@ -324,6 +352,171 @@ def test_decoupled_execute_cancel(self):
324352
self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text)
325353
self.assertIn("[execute_cancel] Request cancelled at ", log_text)
326354

355+
def test_decoupled_bls_cancel(self):
356+
model_names = ["decoupled_bls_cancel", "decoupled_bls_async_cancel"]
357+
input_value = 1
358+
max_sum_value = 10
359+
ignore_cancel = False
360+
user_data = UserData()
361+
for model_name in model_names:
362+
with self._shm_leak_detector.Probe() as shm_probe:
363+
with grpcclient.InferenceServerClient(
364+
f"{_tritonserver_ipaddr}:8001"
365+
) as client:
366+
client.start_stream(callback=partial(callback, user_data))
367+
inputs = prepare_decoupled_bls_cancel_inputs(
368+
input_value=input_value,
369+
max_sum_value=max_sum_value,
370+
ignore_cancel=ignore_cancel,
371+
)
372+
client.async_stream_infer(model_name, inputs)
373+
374+
# Check the results of the decoupled model using BLS
375+
def check_result(result):
376+
# Make sure the result is not an exception
377+
self.assertIsNot(type(result), InferenceServerException)
378+
is_cancelled = result.as_numpy("IS_CANCELLED")
379+
self.assertTrue(
380+
is_cancelled[0],
381+
"error: expected the request to be cancelled",
382+
)
383+
384+
max_sum_data = np.array([max_sum_value], dtype=np.int32)
385+
sum_data = result.as_numpy("SUM")
386+
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
387+
self.assertTrue(
388+
np.array_equal(sum_data, max_sum_data),
389+
"error: expected output {} to match input {}".format(
390+
sum_data, max_sum_data
391+
),
392+
)
393+
394+
result = user_data._completed_requests.get()
395+
check_result(result)
396+
397+
def test_decoupled_bls_ignore_cancel(self):
398+
model_names = ["decoupled_bls_cancel", "decoupled_bls_async_cancel"]
399+
input_value = 1
400+
max_sum_value = 10
401+
ignore_cancel = True
402+
user_data = UserData()
403+
for model_name in model_names:
404+
with self._shm_leak_detector.Probe() as shm_probe:
405+
with grpcclient.InferenceServerClient(
406+
f"{_tritonserver_ipaddr}:8001"
407+
) as client:
408+
client.start_stream(callback=partial(callback, user_data))
409+
inputs = prepare_decoupled_bls_cancel_inputs(
410+
input_value=input_value,
411+
max_sum_value=max_sum_value,
412+
ignore_cancel=ignore_cancel,
413+
)
414+
client.async_stream_infer(model_name, inputs)
415+
416+
# Check the results of the decoupled model using BLS
417+
def check_result(result):
418+
# Make sure the result is not an exception
419+
self.assertIsNot(type(result), InferenceServerException)
420+
is_cancelled = result.as_numpy("IS_CANCELLED")
421+
self.assertFalse(
422+
is_cancelled[0],
423+
"error: expected the request not being cancelled",
424+
)
425+
426+
max_sum_data = np.array([max_sum_value], dtype=np.int32)
427+
sum_data = result.as_numpy("SUM")
428+
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
429+
self.assertTrue(
430+
sum_data > max_sum_data,
431+
"error: expected sum_data {} to be greater than max_sum_data {}".format(
432+
sum_data, max_sum_data
433+
),
434+
)
435+
436+
result = user_data._completed_requests.get()
437+
check_result(result)
438+
439+
def test_decoupled_bls_cancel_after_cancellation(self):
440+
model_name = "decoupled_bls_cancel_after_complete"
441+
input_value = 1
442+
max_sum_value = 10
443+
ignore_cancel = False
444+
user_data = UserData()
445+
with self._shm_leak_detector.Probe() as shm_probe:
446+
with grpcclient.InferenceServerClient(
447+
f"{_tritonserver_ipaddr}:8001"
448+
) as client:
449+
client.start_stream(callback=partial(callback, user_data))
450+
inputs = prepare_decoupled_bls_cancel_inputs(
451+
input_value=input_value,
452+
max_sum_value=max_sum_value,
453+
ignore_cancel=ignore_cancel,
454+
)
455+
client.async_stream_infer(model_name, inputs)
456+
457+
# Check the results of the decoupled model using BLS
458+
def check_result(result):
459+
# Make sure the result is not an exception
460+
self.assertIsNot(type(result), InferenceServerException)
461+
is_cancelled = result.as_numpy("IS_CANCELLED")
462+
self.assertTrue(
463+
is_cancelled[0], "error: expected the request to be cancelled"
464+
)
465+
466+
max_sum_data = np.array([max_sum_value], dtype=np.int32)
467+
sum_data = result.as_numpy("SUM")
468+
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
469+
self.assertTrue(
470+
np.array_equal(sum_data, max_sum_data),
471+
"error: expected output {} to match input {}".format(
472+
sum_data, max_sum_data
473+
),
474+
)
475+
476+
result = user_data._completed_requests.get()
477+
check_result(result)
478+
479+
def test_decoupled_bls_cancel_after_completion(self):
480+
model_name = "decoupled_bls_cancel_after_complete"
481+
input_value = 1
482+
max_sum_value = 25
483+
ignore_cancel = False
484+
user_data = UserData()
485+
with self._shm_leak_detector.Probe() as shm_probe:
486+
with grpcclient.InferenceServerClient(
487+
f"{_tritonserver_ipaddr}:8001"
488+
) as client:
489+
client.start_stream(callback=partial(callback, user_data))
490+
inputs = prepare_decoupled_bls_cancel_inputs(
491+
input_value=input_value,
492+
max_sum_value=max_sum_value,
493+
ignore_cancel=ignore_cancel,
494+
)
495+
client.async_stream_infer(model_name, inputs)
496+
497+
# Check the results of the decoupled model using BLS
498+
def check_result(result):
499+
# Make sure the result is not an exception
500+
self.assertIsNot(type(result), InferenceServerException)
501+
is_cancelled = result.as_numpy("IS_CANCELLED")
502+
self.assertFalse(
503+
is_cancelled[0],
504+
"error: expected the request not being cancelled",
505+
)
506+
507+
max_sum_data = np.array([max_sum_value], dtype=np.int32)
508+
sum_data = result.as_numpy("SUM")
509+
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
510+
self.assertTrue(
511+
sum_data < max_sum_data,
512+
"error: expected sum_data {} to be lesser than max_sum_data {}".format(
513+
sum_data, max_sum_data
514+
),
515+
)
516+
517+
result = user_data._completed_requests.get()
518+
check_result(result)
519+
327520
def test_decoupled_raise_exception(self):
328521
# The decoupled_raise_exception model raises an exception for the request.
329522
# This test case is making sure that repeated exceptions are properly handled.
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
#
3+
# Redistribution and use in source and binary forms, with or without
4+
# modification, are permitted provided that the following conditions
5+
# are met:
6+
# * Redistributions of source code must retain the above copyright
7+
# notice, this list of conditions and the following disclaimer.
8+
# * Redistributions in binary form must reproduce the above copyright
9+
# notice, this list of conditions and the following disclaimer in the
10+
# documentation and/or other materials provided with the distribution.
11+
# * Neither the name of NVIDIA CORPORATION nor the names of its
12+
# contributors may be used to endorse or promote products derived
13+
# from this software without specific prior written permission.
14+
#
15+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
16+
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18+
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
19+
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20+
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21+
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22+
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
23+
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24+
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26+
import asyncio
27+
28+
import numpy as np
29+
import triton_python_backend_utils as pb_utils
30+
31+
32+
class TritonPythonModel:
33+
"""
34+
This model sends a decoupled bls inference request to 'response_sender_until_cancelled'
35+
model, and sums up its responses.
36+
Once the MAX_SUM is reached, the model will call the response iterator's
37+
cancel() method to cancel the response stream.
38+
If the IGNORE_CANCEL is set to True, the 'response_sender_until_cancelled' model will not hornor
39+
the request cancellation and keep sending the output to the model.
40+
The number of total responses should not reach MAX_RESPONSE_COUNT.
41+
"""
42+
43+
async def execute(self, requests):
44+
max_sum = (
45+
pb_utils.get_input_tensor_by_name(requests[0], "MAX_SUM").as_numpy().flat[0]
46+
)
47+
input = pb_utils.get_input_tensor_by_name(requests[0], "INPUT")
48+
ignore_cancel = pb_utils.get_input_tensor_by_name(requests[0], "IGNORE_CANCEL")
49+
delay = pb_utils.Tensor("DELAY", np.array([50], dtype=np.int32))
50+
max_response_count = pb_utils.Tensor(
51+
"MAX_RESPONSE_COUNT", np.array([20], dtype=np.int32)
52+
)
53+
54+
infer_request = pb_utils.InferenceRequest(
55+
model_name="response_sender_until_cancelled",
56+
inputs=[input, max_response_count, delay, ignore_cancel],
57+
requested_output_names=["OUTPUT"],
58+
)
59+
60+
response_stream = await infer_request.async_exec(decoupled=True)
61+
62+
is_cancelled = False
63+
error = None
64+
response_sum = 0
65+
for infer_response in response_stream:
66+
if infer_response.has_error():
67+
if infer_response.error().code() == pb_utils.TritonError.CANCELLED:
68+
is_cancelled = True
69+
else:
70+
error = infer_response.error()
71+
break
72+
73+
out = pb_utils.get_output_tensor_by_name(
74+
infer_response, "OUTPUT"
75+
).as_numpy()[0]
76+
77+
response_sum += out
78+
if response_sum >= max_sum:
79+
response_stream.cancel()
80+
81+
responses = [
82+
pb_utils.InferenceResponse(
83+
output_tensors=[
84+
pb_utils.Tensor("SUM", np.array([response_sum], dtype=np.int32)),
85+
pb_utils.Tensor(
86+
"IS_CANCELLED", np.array([is_cancelled], dtype=np.bool_)
87+
),
88+
],
89+
error=error,
90+
)
91+
]
92+
93+
return responses
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
#
3+
# Redistribution and use in source and binary forms, with or without
4+
# modification, are permitted provided that the following conditions
5+
# are met:
6+
# * Redistributions of source code must retain the above copyright
7+
# notice, this list of conditions and the following disclaimer.
8+
# * Redistributions in binary form must reproduce the above copyright
9+
# notice, this list of conditions and the following disclaimer in the
10+
# documentation and/or other materials provided with the distribution.
11+
# * Neither the name of NVIDIA CORPORATION nor the names of its
12+
# contributors may be used to endorse or promote products derived
13+
# from this software without specific prior written permission.
14+
#
15+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
16+
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18+
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
19+
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20+
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21+
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22+
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
23+
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24+
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26+
27+
name: "decoupled_bls_async_cancel"
28+
backend: "python"
29+
30+
input [
31+
{
32+
name: "INPUT"
33+
data_type: TYPE_INT32
34+
dims: [ 1 ]
35+
},
36+
{
37+
name: "MAX_SUM"
38+
data_type: TYPE_INT32
39+
dims: [ 1 ]
40+
},
41+
{
42+
name: "IGNORE_CANCEL"
43+
data_type: TYPE_BOOL
44+
dims: [ 1 ]
45+
}
46+
]
47+
output [
48+
{
49+
name: "SUM"
50+
data_type: TYPE_INT32
51+
dims: [ 1 ]
52+
},
53+
{
54+
name: "IS_CANCELLED"
55+
data_type: TYPE_BOOL
56+
dims: [ 1 ]
57+
}
58+
]
59+
60+
instance_group [
61+
{
62+
count: 1
63+
kind : KIND_CPU
64+
}
65+
]

0 commit comments

Comments
 (0)