Skip to content
This repository was archived by the owner on Jan 16, 2024. It is now read-only.

GstreamerMediaPlayer: Fix callback function use-after-free #1867

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,6 @@ class BaseStreamSource : public SourceInterface {
/// Number of times reading data has been attempted since data was last successfully read.
guint m_sourceRetryCount;

/// Function to invoke on the worker thread thread when more data is needed.
const std::function<gboolean()> m_handleNeedDataFunction;

/// Function to invoke on the worker thread thread when there is enough data.
const std::function<gboolean()> m_handleEnoughDataFunction;

/// ID of the handler installed to receive need data signals.
guint m_needDataHandlerId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class MediaPlayer
void setDecoder(GstElement* decoder) override;
GstElement* getDecoder() const override;
GstElement* getPipeline() const override;
guint queueCallback(const std::function<gboolean()>* callback) override;
guint queueCallback(std::function<gboolean()>&& callback) override;
guint attachSource(GSource* source) override;
gboolean removeSource(guint tag) override;
/// @}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class PipelineInterface {
* @param callback The callback to queue.
* @return The ID of the queued callback (for calling @c g_source_remove).
*/
virtual guint queueCallback(const std::function<gboolean()>* callback) = 0;
virtual guint queueCallback(std::function<gboolean()>&& callback) = 0;

/**
* Attach the source to the worker thread.
Expand Down
6 changes: 2 additions & 4 deletions MediaPlayer/GStreamerMediaPlayer/src/BaseStreamSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ BaseStreamSource::BaseStreamSource(PipelineInterface* pipeline, const std::strin
m_sourceId{0},
m_hasReadData{false},
m_sourceRetryCount{0},
m_handleNeedDataFunction{[this]() { return handleNeedData(); }},
m_handleEnoughDataFunction{[this]() { return handleEnoughData(); }},
m_needDataHandlerId{0},
m_enoughDataHandlerId{0},
m_seekDataHandlerId{0},
Expand Down Expand Up @@ -311,7 +309,7 @@ void BaseStreamSource::onNeedData(GstElement* pipeline, guint size, gpointer poi
ACSDK_DEBUG9(LX("m_needDataCallbackId already set"));
return;
}
source->m_needDataCallbackId = source->m_pipeline->queueCallback(&source->m_handleNeedDataFunction);
source->m_needDataCallbackId = source->m_pipeline->queueCallback([source]() { return source->handleNeedData(); });
}

gboolean BaseStreamSource::handleNeedData() {
Expand All @@ -330,7 +328,7 @@ void BaseStreamSource::onEnoughData(GstElement* pipeline, gpointer pointer) {
ACSDK_DEBUG9(LX("m_enoughDataCallbackId already set"));
return;
}
source->m_enoughDataCallbackId = source->m_pipeline->queueCallback(&source->m_handleEnoughDataFunction);
source->m_enoughDataCallbackId = source->m_pipeline->queueCallback([source]() { return source->handleEnoughData(); });
}

gboolean BaseStreamSource::handleEnoughData() {
Expand Down
157 changes: 80 additions & 77 deletions MediaPlayer/GStreamerMediaPlayer/src/MediaPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ MediaPlayer::SourceId MediaPlayer::setSource(
const avsCommon::utils::AudioFormat* audioFormat,
const SourceConfig& config) {
ACSDK_DEBUG9(LX("setSourceCalled").d("name", RequiresShutdown::name()).d("sourceType", "AttachmentReader"));
std::promise<MediaPlayer::SourceId> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, &reader, &promise, &config, audioFormat]() {
handleSetAttachmentReaderSource(std::move(reader), config, &promise, audioFormat);
auto promise = std::make_shared<std::promise<MediaPlayer::SourceId>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, &reader, promise, &config, audioFormat]() {
handleSetAttachmentReaderSource(std::move(reader), config, promise.get(), audioFormat);
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
auto sourceId = future.get();
// Assume that the Attachment is fully buffered - not ideal, revisit if needed. Should be fine for file streams
// and resources.
Expand All @@ -223,13 +223,13 @@ MediaPlayer::SourceId MediaPlayer::setSource(
avsCommon::utils::MediaType format) {
ACSDK_DEBUG9(
LX("setSourceCalled").d("name", RequiresShutdown::name()).d("sourceType", "istream").d("format", format));
std::promise<MediaPlayer::SourceId> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, &stream, repeat, &config, &promise]() {
handleSetIStreamSource(stream, repeat, config, &promise);
auto promise = std::make_shared<std::promise<MediaPlayer::SourceId>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, &stream, repeat, &config, promise]() {
handleSetIStreamSource(stream, repeat, config, promise.get());
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
auto sourceId = future.get();
// Assume that the Attachment is fully buffered - not ideal, revisit if needed. Should be fine for file streams
// and resources.
Expand All @@ -246,13 +246,13 @@ MediaPlayer::SourceId MediaPlayer::setSource(
bool repeat,
const PlaybackContext& playbackContext) {
ACSDK_DEBUG9(LX("setSourceForUrlCalled").d("name", RequiresShutdown::name()).sensitive("url", url));
std::promise<MediaPlayer::SourceId> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, url, offset, &config, &promise, repeat]() {
handleSetUrlSource(url, offset, config, &promise, repeat);
auto promise = std::make_shared<std::promise<MediaPlayer::SourceId>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, url, offset, &config, promise, repeat]() {
handleSetUrlSource(url, offset, config, promise.get(), repeat);
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return ERROR_SOURCE_ID;
Expand Down Expand Up @@ -281,71 +281,71 @@ bool MediaPlayer::play(MediaPlayer::SourceId id) {

m_source->preprocess();

std::promise<bool> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, id, &promise]() {
handlePlay(id, &promise);
auto promise = std::make_shared<std::promise<bool>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, id, promise]() {
handlePlay(id, promise.get());
return false;
};

if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return false;
}

bool MediaPlayer::stop(MediaPlayer::SourceId id) {
ACSDK_DEBUG9(LX("stopCalled").d("name", RequiresShutdown::name()));
std::promise<bool> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, id, &promise]() {
handleStop(id, &promise);
auto promise = std::make_shared<std::promise<bool>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, id, promise]() {
handleStop(id, promise.get());
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return false;
}

bool MediaPlayer::pause(MediaPlayer::SourceId id) {
ACSDK_DEBUG9(LX("pausedCalled").d("name", RequiresShutdown::name()));
std::promise<bool> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, id, &promise]() {
handlePause(id, &promise);
auto promise = std::make_shared<std::promise<bool>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, id, promise]() {
handlePause(id, promise.get());
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return false;
}

bool MediaPlayer::resume(MediaPlayer::SourceId id) {
ACSDK_DEBUG9(LX("resumeCalled").d("name", RequiresShutdown::name()));
std::promise<bool> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, id, &promise]() {
handleResume(id, &promise);
auto promise = std::make_shared<std::promise<bool>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, id, promise]() {
handleResume(id, promise.get());
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return false;
}

std::chrono::milliseconds MediaPlayer::getOffset(MediaPlayer::SourceId id) {
ACSDK_DEBUG9(LX("getOffsetCalled").d("name", RequiresShutdown::name()));
std::promise<std::chrono::milliseconds> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, id, &promise]() {
handleGetOffset(id, &promise);
auto promise = std::make_shared<std::promise<std::chrono::milliseconds>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, id, promise]() {
handleGetOffset(id, promise.get());
return false;
};

if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return MEDIA_PLAYER_INVALID_OFFSET;
Expand All @@ -363,14 +363,14 @@ void MediaPlayer::addObserver(std::shared_ptr<MediaPlayerObserverInterface> obse
}

ACSDK_DEBUG9(LX("addObserverCalled").d("name", RequiresShutdown::name()));
std::promise<void> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, &promise, &observer]() {
handleAddObserver(&promise, observer);
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, promise, &observer]() {
handleAddObserver(promise.get(), observer);
return false;
};

if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
future.wait();
}
}
Expand All @@ -382,27 +382,27 @@ void MediaPlayer::removeObserver(std::shared_ptr<MediaPlayerObserverInterface> o
}

ACSDK_DEBUG9(LX("removeObserverCalled").d("name", RequiresShutdown::name()));
std::promise<void> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, &promise, &observer]() {
handleRemoveObserver(&promise, observer);
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, promise, &observer]() {
handleRemoveObserver(promise.get(), observer);
return false;
};

if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
future.wait();
}
}

bool MediaPlayer::setVolume(int8_t volume) {
ACSDK_DEBUG9(LX("setVolumeCalled").d("name", RequiresShutdown::name()));
std::promise<bool> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, &promise, volume]() {
handleSetVolume(&promise, volume);
auto promise = std::make_shared<std::promise<bool>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, promise, volume]() {
handleSetVolume(promise.get(), volume);
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return false;
Expand Down Expand Up @@ -455,13 +455,13 @@ void MediaPlayer::handleSetVolume(std::promise<bool>* promise, int8_t volume) {

bool MediaPlayer::setMute(bool mute) {
ACSDK_DEBUG9(LX("setMuteCalled").d("name", RequiresShutdown::name()));
std::promise<bool> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, &promise, mute]() {
handleSetMute(&promise, mute);
auto promise = std::make_shared<std::promise<bool>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, promise, mute]() {
handleSetMute(promise.get(), mute);
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return false;
Expand All @@ -483,13 +483,13 @@ void MediaPlayer::handleSetMute(std::promise<bool>* promise, bool mute) {

bool MediaPlayer::getSpeakerSettings(SpeakerInterface::SpeakerSettings* settings) {
ACSDK_DEBUG9(LX("getSpeakerSettingsCalled").d("name", RequiresShutdown::name()));
std::promise<bool> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, &promise, settings]() {
handleGetSpeakerSettings(&promise, settings);
auto promise = std::make_shared<std::promise<bool>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, promise, settings]() {
handleGetSpeakerSettings(promise.get(), settings);
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
return future.get();
}
return false;
Expand Down Expand Up @@ -924,13 +924,16 @@ bool MediaPlayer::seek() {
return seekSuccessful;
}

guint MediaPlayer::queueCallback(const std::function<gboolean()>* callback) {
guint MediaPlayer::queueCallback(std::function<gboolean()>&& callback) {
if (isShutdown()) {
return UNQUEUED_CALLBACK;
}
auto source = g_idle_source_new();
g_source_set_callback(
source, reinterpret_cast<GSourceFunc>(&onCallback), const_cast<std::function<gboolean()>*>(callback), nullptr);
source, reinterpret_cast<GSourceFunc>(&onCallback), new std::function<gboolean()>(std::move(callback)),
[](gpointer data) {
delete reinterpret_cast<std::function<gboolean()>*>(data);
});
auto sourceId = g_source_attach(source, m_workerContext);
g_source_unref(source);
return sourceId;
Expand Down Expand Up @@ -996,13 +999,13 @@ gboolean MediaPlayer::onCallback(const std::function<gboolean()>* callback) {
void MediaPlayer::onPadAdded(GstElement* decoder, GstPad* pad, gpointer pointer) {
auto mediaPlayer = static_cast<MediaPlayer*>(pointer);
ACSDK_DEBUG9(LX("onPadAddedCalled").d("name", mediaPlayer->name()));
std::promise<void> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [mediaPlayer, &promise, decoder, pad]() {
mediaPlayer->handlePadAdded(&promise, decoder, pad);
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [mediaPlayer, promise, decoder, pad]() {
mediaPlayer->handlePadAdded(promise.get(), decoder, pad);
return false;
};
if (mediaPlayer->queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (mediaPlayer->queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
future.wait();
}
}
Expand Down Expand Up @@ -1932,9 +1935,9 @@ void MediaPlayer::setEqualizerBandLevels(EqualizerBandLevelMap bandLevelMap) {
if (!m_equalizerEnabled) {
return;
}
std::promise<void> promise;
auto future = promise.get_future();
std::function<gboolean()> callback = [this, &promise, bandLevelMap]() {
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
std::function<gboolean()> callback = [this, promise, bandLevelMap]() {
auto it = bandLevelMap.find(EqualizerBand::BASS);
if (bandLevelMap.end() != it) {
g_object_set(
Expand All @@ -1959,10 +1962,10 @@ void MediaPlayer::setEqualizerBandLevels(EqualizerBandLevelMap bandLevelMap) {
static_cast<gdouble>(clampEqualizerLevel(it->second)),
NULL);
}
promise.set_value();
promise->set_value();
return false;
};
if (queueCallback(&callback) != UNQUEUED_CALLBACK) {
if (queueCallback(std::move(callback)) != UNQUEUED_CALLBACK) {
future.get();
}
}
Expand Down