Skip to content

Commit c6c45ff

Browse files
authored
Add queue size (pending request count) metric per-model (#246)
1 parent fc2ada2 commit c6c45ff

22 files changed

+371
-52
lines changed

src/backend_model.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -687,9 +687,7 @@ TritonModel::SetConfiguredScheduler(
687687
RETURN_IF_ERROR(DynamicBatchScheduler::Create(
688688
this, nullptr, 0 /*nice*/, true /* dynamic_batching_enabled */,
689689
config_.max_batch_size(), enforce_equal_shape_tensors,
690-
config_.dynamic_batching(),
691-
config_.response_cache().enable() /* response_cache_enable */,
692-
&scheduler));
690+
config_.dynamic_batching(), &scheduler));
693691
} else {
694692
// Default scheduler. Use dynamic batch scheduler (with batching
695693
// disabled) as the default scheduler.
@@ -699,7 +697,6 @@ TritonModel::SetConfiguredScheduler(
699697
std::unordered_map<
700698
std::string, bool>() /* enforce_equal_shape_tensors */,
701699
false /* preserve_ordering */,
702-
config_.response_cache().enable() /* response_cache_enable */,
703700
std::set<int32_t>() /* preferred_batch_sizes */,
704701
0 /* max_queue_delay_microseconds */, &scheduler));
705702
}

src/backend_model_instance.cc

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
#include "backend_model_instance.h"
2828

29+
#include "status.h"
30+
2931
#ifndef _WIN32
3032
#include <sys/resource.h>
3133
#include <sys/syscall.h>
@@ -185,7 +187,7 @@ TritonModelInstance::TritonModelInstance(
185187
// Let every metric reporter know if caching is enabled to correctly include
186188
// cache miss time into request duration on cache misses.
187189
const bool response_cache_enabled =
188-
model_->Config().response_cache().enable() &&
190+
model_->ResponseCacheEnabled() &&
189191
model_->Server()->ResponseCacheEnabled();
190192
MetricModelReporter::Create(
191193
model_->Name(), model_->Version(), id, response_cache_enabled,
@@ -534,25 +536,55 @@ TritonModelInstance::GenerateWarmupData()
534536
return Status::Success;
535537
}
536538

537-
void
539+
Status
540+
TritonModelInstance::PrepareRequestsForExecution(
541+
std::vector<std::unique_ptr<InferenceRequest>>& requests)
542+
{
543+
for (auto& r : requests) {
544+
// Load the input states for the inference request.
545+
RETURN_IF_ERROR(r->LoadInputStates());
546+
// Set request state to signify that request is no longer pending.
547+
RETURN_IF_ERROR(r->SetState(InferenceRequest::State::EXECUTING));
548+
}
549+
550+
return Status::Success;
551+
}
552+
553+
Status
554+
TritonModelInstance::PrepareRequestsOrRespond(
555+
std::vector<std::unique_ptr<InferenceRequest>>& requests)
556+
{
557+
auto status = PrepareRequestsForExecution(requests);
558+
// If any errors occurred, respond with error for each request.
559+
if (!status.IsOk()) {
560+
for (auto& r : requests) {
561+
InferenceRequest::RespondIfError(r, status, true /* release_requests */);
562+
}
563+
// Log a single error for batch of requests for better visibility
564+
LOG_STATUS_ERROR(status, "Requests failed pre-execution checks");
565+
}
566+
567+
return status;
568+
}
569+
570+
Status
538571
TritonModelInstance::Schedule(
539-
std::vector<std::unique_ptr<InferenceRequest>>&& requests,
540-
const std::function<void()>& OnCompletion)
572+
std::vector<std::unique_ptr<InferenceRequest>>&& requests)
541573
{
574+
// Prepare requests for execution, respond to requests if any error occur.
575+
RETURN_IF_ERROR(PrepareRequestsOrRespond(requests));
576+
542577
// Use a thread local vector to avoid needing to malloc each
543578
// time an inference is run.
544579
thread_local std::vector<TRITONBACKEND_Request*> triton_requests(1024);
545580
triton_requests.clear();
546581
for (auto& r : requests) {
547-
// Load the input states for the inference request.
548-
r->LoadInputStates();
549582
triton_requests.push_back(
550583
reinterpret_cast<TRITONBACKEND_Request*>(r.release()));
551584
}
552585

553586
Execute(triton_requests);
554-
555-
OnCompletion();
587+
return Status::Success;
556588
}
557589

558590
Status

src/backend_model_instance.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,7 @@ class TritonModelInstance {
116116

117117
Status Initialize();
118118
Status WarmUp();
119-
void Schedule(
120-
std::vector<std::unique_ptr<InferenceRequest>>&& requests,
121-
const std::function<void()>& OnCompletion);
119+
Status Schedule(std::vector<std::unique_ptr<InferenceRequest>>&& requests);
122120

123121
TritonModel* Model() const { return model_; }
124122
void* State() { return state_; }
@@ -220,6 +218,10 @@ class TritonModelInstance {
220218
const bool device_blocking);
221219
Status GenerateWarmupData();
222220

221+
Status PrepareRequestsForExecution(
222+
std::vector<std::unique_ptr<InferenceRequest>>& requests);
223+
Status PrepareRequestsOrRespond(
224+
std::vector<std::unique_ptr<InferenceRequest>>& requests);
223225
void Execute(std::vector<TRITONBACKEND_Request*>& triton_requests);
224226

225227
std::shared_ptr<TritonBackendThread> triton_backend_thread_;

src/constants.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ constexpr char kMetricsLabelGpuUuid[] = "gpu_uuid";
7979
constexpr char kWarmupDataFolder[] = "warmup";
8080
constexpr char kInitialStateFolder[] = "initial_state";
8181

82+
// Metric names
83+
constexpr char kPendingRequestMetric[] = "inf_pending_request_count";
84+
8285
constexpr uint64_t NANOS_PER_SECOND = 1000000000;
8386
constexpr uint64_t NANOS_PER_MILLIS = 1000000;
8487
constexpr int MAX_GRPC_MESSAGE_SIZE = INT32_MAX;
@@ -90,7 +93,7 @@ constexpr size_t CUDA_IPC_STRUCT_SIZE = 64;
9093
// MetricModelReporter expects a device ID for GPUs, but we reuse this device
9194
// ID for other metrics as well such as for CPU and Response Cache metrics
9295
constexpr int METRIC_REPORTER_ID_CPU = -1;
93-
constexpr int METRIC_REPORTER_ID_RESPONSE_CACHE = -2;
96+
constexpr int METRIC_REPORTER_ID_UTILITY = -2;
9497
#endif
9598

9699
// Note: This can be replaced with std::byte starting in c++17

src/dynamic_batch_scheduler.cc

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ DynamicBatchScheduler::DynamicBatchScheduler(
5959
TritonModel* model, TritonModelInstance* model_instance,
6060
const bool dynamic_batching_enabled, const int32_t max_batch_size,
6161
const std::unordered_map<std::string, bool>& enforce_equal_shape_tensors,
62-
const bool preserve_ordering, const bool response_cache_enable,
62+
const bool preserve_ordering,
6363
const std::set<int32_t>& preferred_batch_sizes,
6464
const uint64_t max_queue_delay_microseconds,
6565
const inference::ModelQueuePolicy& default_queue_policy,
@@ -79,16 +79,8 @@ DynamicBatchScheduler::DynamicBatchScheduler(
7979
rate_limiter_ = model_->Server()->GetRateLimiter();
8080
// Both the server and model config should specify
8181
// caching enabled for model to utilize response cache.
82-
response_cache_enabled_ =
83-
response_cache_enable && model_->Server()->ResponseCacheEnabled();
84-
#ifdef TRITON_ENABLE_METRICS
85-
// Initialize metric reporter for cache statistics if cache enabled
86-
if (response_cache_enabled_) {
87-
MetricModelReporter::Create(
88-
model_name_, model_->Version(), METRIC_REPORTER_ID_RESPONSE_CACHE,
89-
response_cache_enabled_, model_->Config().metric_tags(), &reporter_);
90-
}
91-
#endif // TRITON_ENABLE_METRICS
82+
response_cache_enabled_ = model_->ResponseCacheEnabled() &&
83+
model_->Server()->ResponseCacheEnabled();
9284
max_preferred_batch_size_ = 0;
9385
for (const auto size : preferred_batch_sizes_) {
9486
max_preferred_batch_size_ =
@@ -108,7 +100,7 @@ DynamicBatchScheduler::Create(
108100
TritonModel* model, TritonModelInstance* model_instance, const int nice,
109101
const bool dynamic_batching_enabled, const int32_t max_batch_size,
110102
const std::unordered_map<std::string, bool>& enforce_equal_shape_tensors,
111-
const bool preserve_ordering, const bool response_cache_enable,
103+
const bool preserve_ordering,
112104
const std::set<int32_t>& preferred_batch_sizes,
113105
const uint64_t max_queue_delay_microseconds,
114106
std::unique_ptr<Scheduler>* scheduler)
@@ -122,8 +114,7 @@ DynamicBatchScheduler::Create(
122114

123115
return Create(
124116
model, model_instance, nice, dynamic_batching_enabled, max_batch_size,
125-
enforce_equal_shape_tensors, batcher_config, response_cache_enable,
126-
scheduler);
117+
enforce_equal_shape_tensors, batcher_config, scheduler);
127118
}
128119

129120
Status
@@ -132,7 +123,7 @@ DynamicBatchScheduler::Create(
132123
const bool dynamic_batching_enabled, const int32_t max_batch_size,
133124
const std::unordered_map<std::string, bool>& enforce_equal_shape_tensors,
134125
const inference::ModelDynamicBatching& batcher_config,
135-
const bool response_cache_enable, std::unique_ptr<Scheduler>* scheduler)
126+
std::unique_ptr<Scheduler>* scheduler)
136127
{
137128
std::set<int32_t> preferred_batch_sizes;
138129
for (const auto size : batcher_config.preferred_batch_size()) {
@@ -142,8 +133,7 @@ DynamicBatchScheduler::Create(
142133
DynamicBatchScheduler* dyna_sched = new DynamicBatchScheduler(
143134
model, model_instance, dynamic_batching_enabled, max_batch_size,
144135
enforce_equal_shape_tensors, batcher_config.preserve_ordering(),
145-
response_cache_enable, preferred_batch_sizes,
146-
batcher_config.max_queue_delay_microseconds(),
136+
preferred_batch_sizes, batcher_config.max_queue_delay_microseconds(),
147137
batcher_config.default_queue_policy(), batcher_config.priority_levels(),
148138
batcher_config.priority_queue_policy());
149139
std::unique_ptr<DynamicBatchScheduler> sched(dyna_sched);
@@ -681,7 +671,7 @@ DynamicBatchScheduler::DelegateResponse(
681671
// Use model_ to update stats directly because request object can be
682672
// released by the backend before getting to this callback.
683673
model_->MutableStatsAggregator()->UpdateSuccessCacheMiss(
684-
reporter_.get(), cache_miss_ns);
674+
model_->MetricReporter().get(), cache_miss_ns);
685675
#endif // TRITON_ENABLE_STATS
686676
if (!status.IsOk()) {
687677
LOG_ERROR << "Failed to insert key [" << key
@@ -736,7 +726,7 @@ DynamicBatchScheduler::CacheLookUp(
736726
#ifdef TRITON_ENABLE_STATS
737727
// Update model metrics/stats on cache hits
738728
// Backends will update metrics as normal on cache misses
739-
request->ReportStatisticsCacheHit(reporter_.get());
729+
request->ReportStatisticsCacheHit(model_->MetricReporter().get());
740730
#endif // TRITON_ENABLE_STATS
741731
}
742732
}

src/dynamic_batch_scheduler.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class DynamicBatchScheduler : public Scheduler {
5555
TritonModel* model, TritonModelInstance* model_instance, const int nice,
5656
const bool dynamic_batching_enabled, const int32_t max_batch_size,
5757
const std::unordered_map<std::string, bool>& enforce_equal_shape_tensors,
58-
const bool preserve_ordering, const bool response_cache_enable,
58+
const bool preserve_ordering,
5959
const std::set<int32_t>& preferred_batch_sizes,
6060
const uint64_t max_queue_delay_microseconds,
6161
std::unique_ptr<Scheduler>* scheduler);
@@ -68,7 +68,7 @@ class DynamicBatchScheduler : public Scheduler {
6868
const bool dynamic_batching_enabled, const int32_t max_batch_size,
6969
const std::unordered_map<std::string, bool>& enforce_equal_shape_tensors,
7070
const inference::ModelDynamicBatching& batcher_config,
71-
const bool response_cache_enable, std::unique_ptr<Scheduler>* scheduler);
71+
std::unique_ptr<Scheduler>* scheduler);
7272

7373
~DynamicBatchScheduler();
7474

@@ -88,14 +88,12 @@ class DynamicBatchScheduler : public Scheduler {
8888
// \see Scheduler::Stop()
8989
void Stop() override { stop_ = true; }
9090

91-
MetricModelReporter* MetricReporter() const { return reporter_.get(); }
92-
9391
private:
9492
DynamicBatchScheduler(
9593
TritonModel* model, TritonModelInstance* model_instance,
9694
const bool dynamic_batching_enabled, const int32_t max_batch_size,
9795
const std::unordered_map<std::string, bool>& enforce_equal_shape_tensors,
98-
const bool preserve_ordering, const bool response_cache_enable,
96+
const bool preserve_ordering,
9997
const std::set<int32_t>& preferred_batch_sizes,
10098
const uint64_t max_queue_delay_microseconds,
10199
const inference::ModelQueuePolicy& default_queue_policy,
@@ -188,9 +186,6 @@ class DynamicBatchScheduler : public Scheduler {
188186

189187
// Preserves the order in which responses are finalized
190188
std::mutex finalize_mtx_;
191-
192-
// Reporter for metrics, or nullptr if no metrics should be reported
193-
std::shared_ptr<MetricModelReporter> reporter_;
194189
};
195190

196191
}} // namespace triton::core

src/ensemble_scheduler.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ struct TensorData {
222222
// scope after the step's callback is finished. The step's callback will
223223
// schedule new steps if available and the last step will finish the ensemble
224224
// request.
225-
// So we don't have to maintian the context in scheduler as the shared_ptr
225+
// So we don't have to maintain the context in scheduler as the shared_ptr
226226
// will destroy the context for us if there are no "in-flight" steps.
227227
class EnsembleContext {
228228
public:
@@ -1319,6 +1319,9 @@ EnsembleScheduler::Enqueue(std::unique_ptr<InferenceRequest>& request)
13191319
// Add additional callback to keep track of in-flight count
13201320
++inflight_count_;
13211321
request->AddInternalReleaseCallback([this]() { --inflight_count_; });
1322+
// Consider the top-level "ensemble" request executing once passed to a
1323+
// composing model. Composing model requests will track their own states.
1324+
RETURN_IF_ERROR(request->SetState(InferenceRequest::State::EXECUTING));
13221325
std::shared_ptr<EnsembleContext> context(new EnsembleContext(
13231326
metric_reporter_.get(), stats_aggregator_, is_, info_.get(), request,
13241327
stream_));

0 commit comments

Comments
 (0)