@@ -171,12 +171,21 @@ public:
171
171
172
172
void stop() override
173
173
{
174
+ // Notify all threads to finish
174
175
finish_condition_.set_trigger_value(true);
175
- thread_pool_.stop();
176
176
177
- std::lock_guard<std::mutex> _(mtx_);
178
- // TODO: Cancel all pending requests
179
- processing_requests_.clear();
177
+ // Cancel all pending requests
178
+ {
179
+ std::lock_guard<std::mutex> _(mtx_);
180
+ for (auto& it : processing_requests_)
181
+ {
182
+ it.second->cancel();
183
+ }
184
+ processing_requests_.clear();
185
+ }
186
+
187
+ // Wait for all threads to finish
188
+ thread_pool_.stop();
180
189
}
181
190
182
191
private:
@@ -198,6 +207,7 @@ private:
198
207
virtual ~IInputFeedProcessor() = default;
199
208
virtual bool process_additional_request(
200
209
const RequestType& request) = 0;
210
+ virtual void cancel_input_feed() = 0;
201
211
};
202
212
203
213
//} Input feed helpers
@@ -308,6 +318,21 @@ $endif$
308
318
replier->send_reply(&reply, info);
309
319
}
310
320
321
+ void cancel()
322
+ {
323
+ // Cancel output feed
324
+ if (output_feed_cancellator_)
325
+ {
326
+ output_feed_cancellator_->cancel();
327
+ }
328
+
329
+ // Cancel input feeds
330
+ for (const auto& input_feed : input_feed_processors_)
331
+ {
332
+ input_feed->cancel_input_feed();
333
+ }
334
+ }
335
+
311
336
private:
312
337
313
338
std::shared_ptr<IOutputFeedCancellator> output_feed_cancellator_;
@@ -562,6 +587,13 @@ struct $op.name$_$param.name$_reader :
562
587
return false;
563
588
}
564
589
590
+ void cancel_input_feed() override
591
+ {
592
+ std::lock_guard<std::mutex> _(mtx_);
593
+ finished_ = true;
594
+ cv_.notify_all();
595
+ }
596
+
565
597
bool read(
566
598
$param.typecode.cppTypename$& value) override
567
599
{
0 commit comments