@@ -25,6 +25,7 @@ VideoDemuxerFlowUnit::~VideoDemuxerFlowUnit() = default;
25
25
modelbox::Status VideoDemuxerFlowUnit::Open (
26
26
const std::shared_ptr<modelbox::Configuration> &opts) {
27
27
key_frame_only_ = opts->GetBool (" key_frame_only" , false );
28
+ queue_size_ = opts->GetUint64 (" queue_size" , queue_size_);
28
29
return modelbox::STATUS_OK;
29
30
}
30
31
modelbox::Status VideoDemuxerFlowUnit::Close () { return modelbox::STATUS_OK; }
@@ -62,15 +63,16 @@ modelbox::Status VideoDemuxerFlowUnit::Reconnect(
62
63
63
64
modelbox::Status VideoDemuxerFlowUnit::Process (
64
65
std::shared_ptr<modelbox::DataContext> data_ctx) {
65
- auto video_demuxer = std::static_pointer_cast<FfmpegVideoDemuxer >(
66
+ auto demuxer_worker = std::static_pointer_cast<DemuxerWorker >(
66
67
data_ctx->GetPrivate (DEMUXER_CTX));
67
68
modelbox::Status demux_status = modelbox::STATUS_FAULT;
68
69
std::shared_ptr<AVPacket> pkt;
69
- if (video_demuxer != nullptr ) {
70
- demux_status = video_demuxer-> Demux (pkt);
70
+ if (demuxer_worker != nullptr ) {
71
+ demux_status = demuxer_worker-> ReadPacket (pkt);
71
72
}
72
73
73
74
if (demux_status == modelbox::STATUS_OK) {
75
+ auto video_demuxer = demuxer_worker->GetDemuxer ();
74
76
auto ret = WriteData (data_ctx, pkt, video_demuxer);
75
77
if (!ret) {
76
78
return ret;
@@ -86,8 +88,9 @@ modelbox::Status VideoDemuxerFlowUnit::Process(
86
88
87
89
void VideoDemuxerFlowUnit::WriteEnd (
88
90
std::shared_ptr<modelbox::DataContext> &data_ctx) {
89
- auto video_demuxer = std::static_pointer_cast<FfmpegVideoDemuxer >(
91
+ auto demuxer_worker = std::static_pointer_cast<DemuxerWorker >(
90
92
data_ctx->GetPrivate (DEMUXER_CTX));
93
+ auto video_demuxer = demuxer_worker->GetDemuxer ();
91
94
auto video_packet_output = data_ctx->Output (VIDEO_PACKET_OUTPUT);
92
95
video_packet_output->Build ({1 });
93
96
auto end_packet = video_packet_output->At (0 );
@@ -287,7 +290,16 @@ modelbox::Status VideoDemuxerFlowUnit::InitDemuxer(
287
290
std::static_pointer_cast<std::string>(meta->GetMeta (SOURCE_URL));
288
291
*uri_meta = *source_url;
289
292
290
- data_ctx->SetPrivate (DEMUXER_CTX, video_demuxer);
293
+ auto is_rtsp = (source_url->find (" rtsp://" ) == 0 );
294
+ auto demuxer_worker =
295
+ std::make_shared<DemuxerWorker>(is_rtsp, queue_size_, video_demuxer);
296
+ ret = demuxer_worker->Init ();
297
+ if (ret != modelbox::STATUS_OK) {
298
+ MBLOG_ERROR << " init demuxer failed, ret " << ret;
299
+ return ret;
300
+ }
301
+
302
+ data_ctx->SetPrivate (DEMUXER_CTX, demuxer_worker);
291
303
data_ctx->SetPrivate (SOURCE_URL, source_url);
292
304
293
305
UpdateStatsInfo (data_ctx, video_demuxer);
@@ -322,3 +334,106 @@ MODELBOX_DRIVER_FLOWUNIT(desc) {
322
334
desc.Desc .SetDescription (FLOWUNIT_DESC);
323
335
desc.Desc .SetVersion (" 1.0.0" );
324
336
}
337
+
338
+ DemuxerWorker::DemuxerWorker (bool is_async, size_t cache_size,
339
+ std::shared_ptr<FfmpegVideoDemuxer> demuxer)
340
+ : is_async_(is_async),
341
+ cache_size_(cache_size),
342
+ demuxer_(std::move(demuxer)) {
343
+ if (cache_size_ < 2 ) {
344
+ cache_size_ = 2 ;
345
+ }
346
+ }
347
+
348
+ DemuxerWorker::~DemuxerWorker () {
349
+ if (demux_thread_ != nullptr ) {
350
+ demux_thread_running_ = false ;
351
+ demux_thread_->join ();
352
+ }
353
+ }
354
+
355
+ modelbox::Status DemuxerWorker::Init () {
356
+ if (!is_async_) {
357
+ return modelbox::STATUS_OK;
358
+ }
359
+
360
+ demux_thread_running_ = true ;
361
+ demux_thread_ = std::make_shared<std::thread>([this ]() {
362
+ while (IsRunning ()) {
363
+ Process ();
364
+ }
365
+ });
366
+
367
+ return modelbox::STATUS_OK;
368
+ }
369
+
370
+ std::shared_ptr<FfmpegVideoDemuxer> DemuxerWorker::GetDemuxer () const {
371
+ return demuxer_;
372
+ }
373
+
374
+ size_t DemuxerWorker::GetDropCount () const { return packet_drop_count_; }
375
+
376
+ modelbox::Status DemuxerWorker::ReadPacket (
377
+ std::shared_ptr<AVPacket> &av_packet) {
378
+ if (!is_async_) {
379
+ return demuxer_->Demux (av_packet);
380
+ }
381
+
382
+ PopCache (av_packet);
383
+ if (av_packet == nullptr ) {
384
+ return last_demux_status_;
385
+ }
386
+
387
+ return modelbox::STATUS_OK;
388
+ }
389
+
390
+ bool DemuxerWorker::IsRunning () const { return demux_thread_running_; }
391
+
392
+ void DemuxerWorker::Process () {
393
+ std::shared_ptr<AVPacket> av_packet;
394
+ last_demux_status_ = demuxer_->Demux (av_packet);
395
+ if (last_demux_status_ != modelbox::STATUS_OK) {
396
+ demux_thread_running_ = false ;
397
+ av_packet = nullptr ;
398
+ }
399
+
400
+ PushCache (av_packet);
401
+ }
402
+
403
+ void DemuxerWorker::PushCache (const std::shared_ptr<AVPacket> &av_packet) {
404
+ std::unique_lock<std::mutex> lock (packet_cache_lock_);
405
+ while (packet_cache_.size () >= cache_size_) {
406
+ ++packet_drop_count_;
407
+ // drop packet, cache_size >= 2
408
+ auto iter = packet_cache_.begin ();
409
+ auto first_iter = iter;
410
+ auto second_iter = ++iter;
411
+ if (IsKeyFrame (*first_iter) && !IsKeyFrame (*second_iter)) {
412
+ // we need drop the frame rely on key frame first to avoid invalid picture
413
+ packet_cache_.erase (second_iter);
414
+ continue ;
415
+ }
416
+
417
+ // no more frame rely on front frame, just drop front
418
+ packet_cache_.pop_front ();
419
+ }
420
+
421
+ packet_cache_.push_back (av_packet);
422
+ packet_cache_not_empty_.notify_all ();
423
+ }
424
+
425
+ void DemuxerWorker::PopCache (std::shared_ptr<AVPacket> &av_packet) {
426
+ std::unique_lock<std::mutex> lock (packet_cache_lock_);
427
+ packet_cache_not_empty_.wait (lock, [&]() { return !packet_cache_.empty (); });
428
+ av_packet = packet_cache_.front ();
429
+ if (av_packet == nullptr ) {
430
+ // stream end, keep nullptr in cache
431
+ return ;
432
+ }
433
+
434
+ packet_cache_.pop_front ();
435
+ }
436
+
437
+ bool DemuxerWorker::IsKeyFrame (const std::shared_ptr<AVPacket> &av_packet) {
438
+ return (av_packet->flags & AV_PKT_FLAG_KEY) != 0 ;
439
+ }
0 commit comments