Skip to content

Commit 02ed661

Browse files
authored
Add request timeout for sequence batcher (#180)
* [WIP] adding timeout for sequence batcher * Fix up
1 parent 6f32e50 commit 02ed661

File tree

2 files changed

+191
-90
lines changed

2 files changed

+191
-90
lines changed

src/sequence_batch_scheduler.cc

Lines changed: 173 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@
4040

4141
namespace triton { namespace core {
4242

43+
namespace {
44+
45+
template <typename TimeUnit>
46+
inline uint64_t
47+
Now()
48+
{
49+
return std::chrono::duration_cast<TimeUnit>(
50+
std::chrono::steady_clock::now().time_since_epoch())
51+
.count();
52+
}
53+
54+
} // namespace
55+
4356
Status
4457
SequenceBatchScheduler::Create(
4558
TritonModel* model,
@@ -567,6 +580,7 @@ SequenceBatchScheduler::Enqueue(std::unique_ptr<InferenceRequest>& irequest)
567580
"inference requests");
568581
}
569582

583+
bool wake_reaper_thread = false;
570584
std::unique_lock<std::mutex> lock(mu_);
571585

572586
auto sb_itr = sequence_to_batcherseqslot_map_.find(correlation_id);
@@ -601,12 +615,8 @@ SequenceBatchScheduler::Enqueue(std::unique_ptr<InferenceRequest>& irequest)
601615
// max_sequence_idle_microseconds value is not exceed for any
602616
// sequence, and if it is it will release the sequence slot (if any)
603617
// allocated to that sequence.
604-
{
605-
uint64_t now_us = std::chrono::duration_cast<std::chrono::microseconds>(
606-
std::chrono::steady_clock::now().time_since_epoch())
607-
.count();
608-
correlation_id_timestamps_[correlation_id] = now_us;
609-
}
618+
uint64_t now_us = Now<std::chrono::microseconds>();
619+
correlation_id_timestamps_[correlation_id] = now_us;
610620

611621
// If this request starts a new sequence but the correlation ID
612622
// already has an in-progress sequence then that previous sequence
@@ -634,7 +644,17 @@ SequenceBatchScheduler::Enqueue(std::unique_ptr<InferenceRequest>& irequest)
634644
LOG_VERBOSE(1) << "Enqueuing CORRID " << correlation_id
635645
<< " into existing backlog: " << irequest->ModelName();
636646

637-
bl_itr->second->emplace_back(std::move(irequest));
647+
auto& backlog = bl_itr->second;
648+
if (irequest->TimeoutMicroseconds() != 0) {
649+
backlog->expiration_timestamp_ = std::min(
650+
backlog->expiration_timestamp_,
651+
now_us + irequest->TimeoutMicroseconds());
652+
if (backlog->expiration_timestamp_ < timeout_timestamp_) {
653+
timeout_timestamp_ = backlog->expiration_timestamp_;
654+
wake_reaper_thread = true;
655+
}
656+
}
657+
backlog->queue_->emplace_back(std::move(irequest));
638658

639659
// If the sequence is ending then forget correlation ID
640660
// connection to this backlog queue. If another sequence starts
@@ -643,6 +663,12 @@ SequenceBatchScheduler::Enqueue(std::unique_ptr<InferenceRequest>& irequest)
643663
if (seq_end) {
644664
sequence_to_backlog_map_.erase(bl_itr);
645665
}
666+
667+
// Waking up reaper so it received latest timeout to be waited for,
668+
// shouldn't incur actual reaper work.
669+
if (wake_reaper_thread) {
670+
reaper_cv_.notify_all();
671+
}
646672
return Status::Success;
647673
}
648674
// This request does not have an assigned backlog or sequence
@@ -658,13 +684,25 @@ SequenceBatchScheduler::Enqueue(std::unique_ptr<InferenceRequest>& irequest)
658684
LOG_VERBOSE(1) << "Enqueuing CORRID " << correlation_id
659685
<< " into new backlog: " << irequest->ModelName();
660686

661-
auto backlog =
662-
std::make_shared<std::deque<std::unique_ptr<InferenceRequest>>>();
687+
auto backlog = std::make_shared<BacklogQueue>();
688+
if (irequest->TimeoutMicroseconds() != 0) {
689+
backlog->expiration_timestamp_ = now_us + irequest->TimeoutMicroseconds();
690+
if (backlog->expiration_timestamp_ < timeout_timestamp_) {
691+
timeout_timestamp_ = backlog->expiration_timestamp_;
692+
wake_reaper_thread = true;
693+
}
694+
}
663695
backlog_queues_.push_back(backlog);
664-
backlog->emplace_back(std::move(irequest));
696+
backlog->queue_->emplace_back(std::move(irequest));
665697
if (!seq_end) {
666698
sequence_to_backlog_map_[correlation_id] = std::move(backlog);
667699
}
700+
701+
// Waking up reaper so it received latest timeout to be waited for,
702+
// shouldn't incur actual reaper work.
703+
if (wake_reaper_thread) {
704+
reaper_cv_.notify_all();
705+
}
668706
return Status::Success;
669707
}
670708

@@ -689,7 +727,6 @@ SequenceBatchScheduler::Enqueue(std::unique_ptr<InferenceRequest>& irequest)
689727
<< irequest->ModelName();
690728

691729
batchers_[batcher_idx]->Enqueue(seq_slot, correlation_id, irequest);
692-
693730
return Status::Success;
694731
}
695732

@@ -703,7 +740,7 @@ SequenceBatchScheduler::ReleaseSequenceSlot(
703740
// If there is a backlogged sequence and it is requested, return it
704741
// so that it can use the newly available sequence slot.
705742
if (!backlog_queues_.empty()) {
706-
auto& backlog = backlog_queues_.front();
743+
auto& backlog = backlog_queues_.front()->queue_;
707744
*requests = std::move(*backlog);
708745
backlog_queues_.pop_front();
709746
if (!requests->empty()) { // should never be empty...
@@ -769,7 +806,7 @@ SequenceBatchScheduler::DelayScheduler(
769806
if (backlog_delay_cnt_ > 0) {
770807
size_t backlog_seen = 0;
771808
for (const auto& q : backlog_queues_) {
772-
backlog_seen += q->size();
809+
backlog_seen += q->queue_->size();
773810
}
774811

775812
if (backlog_seen < backlog_delay_cnt_) {
@@ -798,85 +835,140 @@ SequenceBatchScheduler::ReaperThread(const int nice)
798835

799836
const uint64_t backlog_idle_wait_microseconds = 50 * 1000;
800837

838+
uint64_t idle_timestamp =
839+
Now<std::chrono::microseconds>() + max_sequence_idle_microseconds_;
840+
timeout_timestamp_ = std::numeric_limits<uint64_t>::max();
841+
801842
while (!reaper_thread_exit_) {
802-
uint64_t wait_microseconds = max_sequence_idle_microseconds_;
803-
BatcherSequenceSlotMap force_end_sequences;
843+
uint64_t now_us = Now<std::chrono::microseconds>();
844+
845+
// Reap idle assigned sequence
846+
if (now_us >= idle_timestamp) {
847+
uint64_t wait_microseconds = max_sequence_idle_microseconds_;
848+
BatcherSequenceSlotMap force_end_sequences;
849+
{
850+
std::unique_lock<std::mutex> lock(mu_);
851+
for (auto cid_itr = correlation_id_timestamps_.cbegin();
852+
cid_itr != correlation_id_timestamps_.cend();) {
853+
int64_t remaining_microseconds =
854+
(int64_t)max_sequence_idle_microseconds_ -
855+
(now_us - cid_itr->second);
856+
if (remaining_microseconds > 0) {
857+
wait_microseconds = std::min(
858+
wait_microseconds, (uint64_t)remaining_microseconds + 1);
859+
++cid_itr;
860+
continue;
861+
}
804862

805-
{
806-
std::unique_lock<std::mutex> lock(mu_);
863+
const InferenceRequest::SequenceId& idle_correlation_id =
864+
cid_itr->first;
865+
LOG_VERBOSE(1) << "Reaper: CORRID " << idle_correlation_id
866+
<< ": max sequence idle exceeded";
807867

808-
uint64_t now_us = std::chrono::duration_cast<std::chrono::microseconds>(
809-
std::chrono::steady_clock::now().time_since_epoch())
810-
.count();
811-
812-
for (auto cid_itr = correlation_id_timestamps_.cbegin();
813-
cid_itr != correlation_id_timestamps_.cend();) {
814-
int64_t remaining_microseconds =
815-
(int64_t)max_sequence_idle_microseconds_ -
816-
(now_us - cid_itr->second);
817-
if (remaining_microseconds > 0) {
818-
wait_microseconds =
819-
std::min(wait_microseconds, (uint64_t)remaining_microseconds + 1);
820-
++cid_itr;
821-
continue;
822-
}
868+
auto idle_sb_itr =
869+
sequence_to_batcherseqslot_map_.find(idle_correlation_id);
823870

824-
const InferenceRequest::SequenceId& idle_correlation_id =
825-
cid_itr->first;
826-
LOG_VERBOSE(1) << "Reaper: CORRID " << idle_correlation_id
827-
<< ": max sequence idle exceeded";
828-
829-
auto idle_sb_itr =
830-
sequence_to_batcherseqslot_map_.find(idle_correlation_id);
831-
832-
// If the idle correlation ID has an assigned sequence slot,
833-
// then release that assignment so it becomes available for
834-
// another sequence. Release is done by enqueuing and must be
835-
// done outside the lock, so just collect needed info here.
836-
if (idle_sb_itr != sequence_to_batcherseqslot_map_.end()) {
837-
force_end_sequences[idle_correlation_id] = idle_sb_itr->second;
838-
839-
sequence_to_batcherseqslot_map_.erase(idle_correlation_id);
840-
cid_itr = correlation_id_timestamps_.erase(cid_itr);
841-
} else {
842-
// If the idle correlation ID is in the backlog, then just
843-
// need to increase the timeout so that we revisit it again in
844-
// the future to check if it is assigned to a sequence slot.
845-
auto idle_bl_itr = sequence_to_backlog_map_.find(idle_correlation_id);
846-
if (idle_bl_itr != sequence_to_backlog_map_.end()) {
847-
LOG_VERBOSE(1) << "Reaper: found idle CORRID "
848-
<< idle_correlation_id;
849-
wait_microseconds =
850-
std::min(wait_microseconds, backlog_idle_wait_microseconds);
851-
++cid_itr;
852-
} else {
853-
LOG_VERBOSE(1) << "Reaper: ignoring stale idle CORRID "
854-
<< idle_correlation_id;
871+
// If the idle correlation ID has an assigned sequence slot,
872+
// then release that assignment so it becomes available for
873+
// another sequence. Release is done by enqueuing and must be
874+
// done outside the lock, so just collect needed info here.
875+
if (idle_sb_itr != sequence_to_batcherseqslot_map_.end()) {
876+
force_end_sequences[idle_correlation_id] = idle_sb_itr->second;
877+
878+
sequence_to_batcherseqslot_map_.erase(idle_correlation_id);
855879
cid_itr = correlation_id_timestamps_.erase(cid_itr);
880+
} else {
881+
// If the idle correlation ID is in the backlog, then just
882+
// need to increase the timeout so that we revisit it again in
883+
// the future to check if it is assigned to a sequence slot.
884+
auto idle_bl_itr =
885+
sequence_to_backlog_map_.find(idle_correlation_id);
886+
if (idle_bl_itr != sequence_to_backlog_map_.end()) {
887+
LOG_VERBOSE(1)
888+
<< "Reaper: found idle CORRID " << idle_correlation_id;
889+
wait_microseconds =
890+
std::min(wait_microseconds, backlog_idle_wait_microseconds);
891+
++cid_itr;
892+
} else {
893+
LOG_VERBOSE(1) << "Reaper: ignoring stale idle CORRID "
894+
<< idle_correlation_id;
895+
cid_itr = correlation_id_timestamps_.erase(cid_itr);
896+
}
856897
}
857898
}
858899
}
900+
901+
// Enqueue force-ends outside of the lock.
902+
for (const auto& pr : force_end_sequences) {
903+
const InferenceRequest::SequenceId& idle_correlation_id = pr.first;
904+
const size_t batcher_idx = pr.second.batcher_idx_;
905+
const uint32_t seq_slot = pr.second.seq_slot_;
906+
907+
LOG_VERBOSE(1) << "Reaper: force-ending CORRID " << idle_correlation_id
908+
<< " in batcher " << batcher_idx << ", slot "
909+
<< seq_slot;
910+
911+
// A slot assignment is released by enqueuing a request with a
912+
// null request. The scheduler thread will interpret the null
913+
// request as meaning it should release the sequence slot but
914+
// otherwise do nothing with the request.
915+
std::unique_ptr<InferenceRequest> null_request;
916+
batchers_[batcher_idx]->Enqueue(
917+
seq_slot, idle_correlation_id, null_request);
918+
}
919+
920+
// Update timestamp for next idle check
921+
idle_timestamp = now_us + wait_microseconds;
859922
}
860923

861-
// Enqueue force-ends outside of the lock.
862-
for (const auto& pr : force_end_sequences) {
863-
const InferenceRequest::SequenceId& idle_correlation_id = pr.first;
864-
const size_t batcher_idx = pr.second.batcher_idx_;
865-
const uint32_t seq_slot = pr.second.seq_slot_;
866-
867-
LOG_VERBOSE(1) << "Reaper: force-ending CORRID " << idle_correlation_id
868-
<< " in batcher " << batcher_idx << ", slot " << seq_slot;
869-
870-
// A slot assignment is released by enqueuing a request with a
871-
// null request. The scheduler thread will interpret the null
872-
// request as meaning it should release the sequence slot but
873-
// otherwise do nothing with the request.
874-
std::unique_ptr<InferenceRequest> null_request;
875-
batchers_[batcher_idx]->Enqueue(
876-
seq_slot, idle_correlation_id, null_request);
924+
// Reap timed out backlog sequence
925+
if (now_us >= timeout_timestamp_) {
926+
timeout_timestamp_ = std::numeric_limits<uint64_t>::max();
927+
std::deque<std::shared_ptr<BacklogQueue>> expired_backlogs;
928+
{
929+
std::unique_lock<std::mutex> lock(mu_);
930+
// Remove expired backlog from 'backlog_queues_'
931+
auto it = backlog_queues_.begin();
932+
while (it != backlog_queues_.end()) {
933+
const auto queue_timestamp = (*it)->expiration_timestamp_;
934+
if (queue_timestamp > now_us) {
935+
timeout_timestamp_ = std::min(timeout_timestamp_, queue_timestamp);
936+
++it;
937+
} else {
938+
// The queue expired, clear the records and reject the request
939+
// outside lock
940+
const auto& correlation_id =
941+
(*it)->queue_->front()->CorrelationId();
942+
expired_backlogs.emplace_back(std::move(*it));
943+
944+
// Need to double check on 'sequence_to_backlog_map_', it may
945+
// be tracking a new sequence with the same ID which may not be
946+
// timing out.
947+
const auto& mit = sequence_to_backlog_map_.find(correlation_id);
948+
if ((mit != sequence_to_backlog_map_.end()) &&
949+
(mit->second->expiration_timestamp_ <= now_us)) {
950+
sequence_to_backlog_map_.erase(mit);
951+
}
952+
953+
it = backlog_queues_.erase(it);
954+
}
955+
}
956+
}
957+
958+
// Reject timeout requests
959+
static Status rejected_status = Status(
960+
Status::Code::UNAVAILABLE,
961+
"timeout of the corresponding sequence has been expired");
962+
for (auto& backlog : expired_backlogs) {
963+
for (auto& req : *backlog->queue_) {
964+
InferenceRequest::RespondIfError(req, rejected_status, true);
965+
}
966+
}
877967
}
878968

879-
// Wait until the next idle timeout needs to be checked
969+
const auto wait_microseconds =
970+
std::min(idle_timestamp, timeout_timestamp_) - now_us;
971+
// Wait until the next timeout needs to be checked
880972
if (wait_microseconds > 0) {
881973
std::unique_lock<std::mutex> lock(mu_);
882974
LOG_VERBOSE(2) << "Reaper: sleeping for " << wait_microseconds << "us...";
@@ -1330,10 +1422,7 @@ DirectSequenceBatch::BatcherThread(const int nice)
13301422
// batch, execute now if queuing delay is exceeded or the batch
13311423
// size is large enough. Otherwise create a timer to wakeup a
13321424
// thread to check again at the maximum allowed delay.
1333-
uint64_t now_ns =
1334-
std::chrono::duration_cast<std::chrono::nanoseconds>(
1335-
std::chrono::steady_clock::now().time_since_epoch())
1336-
.count();
1425+
uint64_t now_ns = Now<std::chrono::nanoseconds>();
13371426
uint64_t current_batch_delay_ns =
13381427
(now_ns - earliest_enqueue_time_ns);
13391428
if ((current_batch_delay_ns > pending_batch_delay_ns_) ||

src/sequence_batch_scheduler.h

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <condition_variable>
3030
#include <deque>
3131
#include <future>
32+
#include <limits>
3233
#include <mutex>
3334
#include <queue>
3435
#include <thread>
@@ -146,6 +147,9 @@ class SequenceBatchScheduler : public Scheduler {
146147
std::unique_ptr<std::thread> reaper_thread_;
147148
std::condition_variable reaper_cv_;
148149
bool reaper_thread_exit_;
150+
// Need to share between enqueue thread and reaper thread because
151+
// the timeout may be shorten by new request
152+
uint64_t timeout_timestamp_;
149153

150154
// The SequenceBatchs being managed by this scheduler.
151155
std::vector<std::unique_ptr<SequenceBatch>> batchers_;
@@ -156,17 +160,25 @@ class SequenceBatchScheduler : public Scheduler {
156160
std::unordered_map<InferenceRequest::SequenceId, BatcherSequenceSlot>;
157161
BatcherSequenceSlotMap sequence_to_batcherseqslot_map_;
158162

163+
// The ordered backlog of sequences waiting for a free sequence slot.
164+
// The backlog queue keep track of the closest expiration timestamp among
165+
// the request of the corresponding sequence. Reaper thread will
166+
// sweep the queues on wake up and clear all timed out sequence.
167+
// See ReaperThread() for detail implementation.
168+
struct BacklogQueue {
169+
// Default to max value so it is not possible to time out unless specified.
170+
uint64_t expiration_timestamp_{std::numeric_limits<uint64_t>::max()};
171+
std::shared_ptr<std::deque<std::unique_ptr<InferenceRequest>>> queue_{
172+
std::make_shared<std::deque<std::unique_ptr<InferenceRequest>>>()};
173+
};
174+
std::deque<std::shared_ptr<BacklogQueue>> backlog_queues_;
175+
159176
// Map from a request's correlation ID to the backlog queue
160177
// collecting requests for that correlation ID.
161178
using BacklogMap = std::unordered_map<
162-
InferenceRequest::SequenceId,
163-
std::shared_ptr<std::deque<std::unique_ptr<InferenceRequest>>>>;
179+
InferenceRequest::SequenceId, std::shared_ptr<BacklogQueue>>;
164180
BacklogMap sequence_to_backlog_map_;
165181

166-
// The ordered backlog of sequences waiting for a free sequenceslot.
167-
std::deque<std::shared_ptr<std::deque<std::unique_ptr<InferenceRequest>>>>
168-
backlog_queues_;
169-
170182
// The batcher/sequence-slot locations ready to accept a new
171183
// sequence. Ordered from lowest sequence-slot-number to highest so
172184
// that all batchers grow at the same rate and attempt to remain as

0 commit comments

Comments
 (0)