Skip to content

Commit 9f1fad2

Browse files
authored
Promptly respond to timeout requests under reject policy (#333)
* Promptly respond to timeout requests under reject policy * Remove wrapping CV * Revert "Remove wrapping CV" This reverts commit b682018. * Add wait timeout to cv * Fix typo * Use different loop * Use CV wait for and avoid a separate thread
1 parent fed4bc1 commit 9f1fad2

File tree

4 files changed

+116
-25
lines changed

4 files changed

+116
-25
lines changed

src/dynamic_batch_scheduler.cc

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
1+
// Copyright 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
//
33
// Redistribution and use in source and binary forms, with or without
44
// modification, are permitted provided that the following conditions
@@ -67,6 +67,20 @@ FinishSkippedRequests(
6767
}
6868
}
6969

70+
void
71+
FinishRejectedCancelledRequests(
72+
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>&&
73+
rejected_requests,
74+
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>&&
75+
cancelled_requests)
76+
{
77+
const static Status rejected_status =
78+
Status(Status::Code::UNAVAILABLE, "Request timeout expired");
79+
const static Status cancelled_status = Status(Status::Code::CANCELLED);
80+
FinishSkippedRequests(std::move(rejected_requests), rejected_status);
81+
FinishSkippedRequests(std::move(cancelled_requests), cancelled_status);
82+
}
83+
7084
DynamicBatchScheduler::DynamicBatchScheduler(
7185
TritonModel* model, TritonModelInstance* model_instance,
7286
const bool dynamic_batching_enabled, const int32_t max_batch_size,
@@ -317,10 +331,6 @@ DynamicBatchScheduler::BatcherThread(const int nice)
317331
}
318332
}
319333

320-
auto wait_for_slots = [this]() {
321-
return model_->Server()->GetRateLimiter()->PayloadSlotAvailable(
322-
model_, model_instance_, queue_.SupportPrefetching());
323-
};
324334
const uint64_t default_wait_microseconds = 500 * 1000;
325335

326336
while (!scheduler_thread_exit_.load()) {
@@ -359,18 +369,7 @@ DynamicBatchScheduler::BatcherThread(const int nice)
359369
continue;
360370
}
361371

362-
{
363-
// The wait_for_slots conditional can be blocking till the slots
364-
// are available for execution. Need to explicitly release the
365-
// outer lock to allow Enqueue threads above to make progress.
366-
lock.unlock();
367-
// Use slot lock to wait for the slot availability.
368-
std::mutex slot_mu;
369-
std::unique_lock<std::mutex> slot_lock(slot_mu);
370-
cv_.wait(slot_lock, wait_for_slots);
371-
// Recapture the outer most lock to keep making progress.
372-
lock.lock();
373-
}
372+
WaitForPayloadSlotAvailable(&lock, default_wait_microseconds);
374373

375374
{
376375
std::lock_guard<std::mutex> exec_lock(
@@ -444,17 +443,52 @@ DynamicBatchScheduler::BatcherThread(const int nice)
444443
}
445444

446445
// Finish rejected and cancelled requests if any
447-
const static Status rejected_status =
448-
Status(Status::Code::UNAVAILABLE, "Request timeout expired");
449-
const static Status cancelled_status = Status(Status::Code::CANCELLED);
450-
FinishSkippedRequests(std::move(rejected_requests), rejected_status);
451-
FinishSkippedRequests(std::move(cancelled_requests), cancelled_status);
446+
FinishRejectedCancelledRequests(
447+
std::move(rejected_requests), std::move(cancelled_requests));
452448
} // end runner loop
453449

454450
LOG_VERBOSE(1) << "Stopping dynamic-batcher thread for " << model_name_
455451
<< "...";
456452
}
457453

454+
void
455+
DynamicBatchScheduler::WaitForPayloadSlotAvailable(
456+
std::unique_lock<std::mutex>* lock, uint64_t wait_microseconds)
457+
{
458+
// The wait_for_slots conditional can be blocking till the slots are available
459+
// for execution. Need to explicitly release the 'mu_' lock to allow the
460+
// Enqueue threads above to make progress.
461+
lock->unlock();
462+
463+
const std::chrono::microseconds wait_timeout(wait_microseconds);
464+
std::mutex slot_mu;
465+
std::unique_lock<std::mutex> slot_lock(slot_mu);
466+
bool slot_available = false;
467+
468+
while (!slot_available) {
469+
slot_available = cv_.wait_for(slot_lock, wait_timeout, [this]() {
470+
return model_->Server()->GetRateLimiter()->PayloadSlotAvailable(
471+
model_, model_instance_, queue_.SupportPrefetching(),
472+
true /* force_non_blocking */);
473+
});
474+
if (!slot_available) {
475+
// Reject and release timeout requests from queue.
476+
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>
477+
rejected_requests, cancelled_requests;
478+
{
479+
std::lock_guard<std::mutex> lock(mu_);
480+
queue_.RejectTimeoutRequests();
481+
queue_.ReleaseSkippedRequests(&rejected_requests, &cancelled_requests);
482+
}
483+
FinishRejectedCancelledRequests(
484+
std::move(rejected_requests), std::move(cancelled_requests));
485+
}
486+
}
487+
488+
// Recapture the lock.
489+
lock->lock();
490+
}
491+
458492
uint64_t
459493
DynamicBatchScheduler::GetDynamicBatch()
460494
{

src/dynamic_batch_scheduler.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
1+
// Copyright 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
//
33
// Redistribution and use in source and binary forms, with or without
44
// modification, are permitted provided that the following conditions
@@ -109,6 +109,15 @@ class DynamicBatchScheduler : public Scheduler {
109109
std::unique_ptr<InferenceResponse>& cached_response);
110110
void FinalizeResponses();
111111

112+
// Block until a payload slot is available on the rate limiter. The 'lock'
113+
// should be acquired when calling this function. The 'lock' will be released
114+
// when waiting for payload slot and re-acquired before this function returns.
115+
// For queued requests under policy REJECT, they will be rejected if timed-out
116+
// while waiting for a slot. The timeout will be checked every
117+
// 'wait_microseconds'. The 'wait_microseconds' should be non-zero.
118+
void WaitForPayloadSlotAvailable(
119+
std::unique_lock<std::mutex>* lock, uint64_t wait_microseconds);
120+
112121
// Custom batching function calls
113122
// Returns whether custom batching is enabled.
114123
bool CustomBatchEnabled() const;

src/scheduler_utils.cc

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
1+
// Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
//
33
// Redistribution and use in source and binary forms, with or without
44
// modification, are permitted provided that the following conditions
@@ -237,6 +237,33 @@ PriorityQueue::PolicyQueue::ApplyPolicy(
237237
return ((idx - queue_.size()) < delayed_queue_.size());
238238
}
239239

240+
size_t
241+
PriorityQueue::PolicyQueue::RejectTimeoutRequests()
242+
{
243+
if (timeout_action_ != inference::ModelQueuePolicy::REJECT) {
244+
return 0;
245+
}
246+
247+
size_t rejected_count = 0;
248+
uint64_t now_nanoseconds =
249+
std::chrono::duration_cast<std::chrono::nanoseconds>(
250+
std::chrono::steady_clock::now().time_since_epoch())
251+
.count();
252+
size_t idx = 0;
253+
while (idx < queue_.size()) {
254+
if (timeout_timestamp_ns_[idx] != 0 &&
255+
now_nanoseconds > timeout_timestamp_ns_[idx]) {
256+
rejected_count++;
257+
rejected_queue_.emplace_back(std::move(queue_[idx]));
258+
queue_.erase(queue_.begin() + idx);
259+
timeout_timestamp_ns_.erase(timeout_timestamp_ns_.begin() + idx);
260+
} else {
261+
idx++;
262+
}
263+
}
264+
return rejected_count;
265+
}
266+
240267
void
241268
PriorityQueue::PolicyQueue::ReleaseRejectedQueue(
242269
std::deque<std::unique_ptr<InferenceRequest>>* requests)
@@ -358,6 +385,18 @@ PriorityQueue::Dequeue(std::unique_ptr<InferenceRequest>* request)
358385
return Status(Status::Code::UNAVAILABLE, "dequeue on empty queue");
359386
}
360387

388+
void
389+
PriorityQueue::RejectTimeoutRequests()
390+
{
391+
for (auto it = queues_.begin(); it != queues_.end(); it++) {
392+
size_t rejected_count = it->second.RejectTimeoutRequests();
393+
size_ -= rejected_count;
394+
if (rejected_count > 0 && it->first == pending_cursor_.curr_it_->first) {
395+
pending_cursor_.valid_ = false;
396+
}
397+
}
398+
}
399+
361400
void
362401
PriorityQueue::ReleaseSkippedRequests(
363402
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>*

src/scheduler_utils.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
1+
// Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
//
33
// Redistribution and use in source and binary forms, with or without
44
// modification, are permitted provided that the following conditions
@@ -84,6 +84,11 @@ class PriorityQueue {
8484
// Dequeue the request at the front of the queue.
8585
Status Dequeue(std::unique_ptr<InferenceRequest>* request);
8686

87+
// Reject timed-out requests from 'queues_', if queue policy set to reject.
88+
// The cursor will be marked as invalid, if a request from the queue pointed
89+
// to by the cursor is rejected.
90+
void RejectTimeoutRequests();
91+
8792
// Retrieve the requests that are either rejected or cancelled.
8893
void ReleaseSkippedRequests(
8994
std::vector<std::deque<std::unique_ptr<InferenceRequest>>>*
@@ -203,6 +208,10 @@ class PriorityQueue {
203208
size_t idx, size_t* rejected_count, size_t* rejected_batch_size,
204209
size_t* cancelled_count, size_t* cancelled_batch_size);
205210

211+
// Move timed-out requests from 'queue_' to 'rejected_queue_', if
212+
// 'timeout_action_' is to reject. Return the number of requests rejected.
213+
size_t RejectTimeoutRequests();
214+
206215
// Return the rejected requests held by the queue.
207216
void ReleaseRejectedQueue(
208217
std::deque<std::unique_ptr<InferenceRequest>>* requests);

0 commit comments

Comments
 (0)