Skip to content

Commit 9ab9e07

Browse files
Fix crash on subjects (#697)
* try to catch issue * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update test_subjects.cpp * Update test_subjects.cpp * Update test_subjects.cpp * fix * fix * fix * speedup * fix --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent b212dbc commit 9ab9e07

File tree

2 files changed

+39
-21
lines changed

2 files changed

+39
-21
lines changed

src/rpp/rpp/subjects/details/subject_state.hpp

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include <rpp/utils/utils.hpp>
2121

2222
#include <algorithm>
23-
#include <deque>
23+
#include <list>
2424
#include <memory>
2525
#include <mutex>
2626
#include <variant>
@@ -67,7 +67,7 @@ namespace rpp::subjects::details
6767
};
6868

6969
using observer = std::shared_ptr<rpp::details::observers::observer_vtable<Type>>;
70-
using observers = std::deque<observer>;
70+
using observers = std::list<observer>;
7171
using shared_observers = std::shared_ptr<observers>;
7272
using state_t = std::variant<shared_observers, std::exception_ptr, completed, disposed>;
7373

@@ -112,22 +112,21 @@ namespace rpp::subjects::details
112112
void on_next(const Type& v)
113113
{
114114
std::unique_lock observers_lock{m_mutex};
115+
process_state_unsafe(m_state, [&](shared_observers observers) {
116+
if (!observers)
117+
return;
115118

116-
if (!std::holds_alternative<shared_observers>(m_state))
117-
return;
119+
auto itr = observers->cbegin();
120+
const auto size = observers->size();
118121

119-
// we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
120-
const auto observers = std::get<shared_observers>(m_state);
121-
if (!observers)
122-
return;
122+
observers_lock.unlock();
123123

124-
const auto begin = observers->cbegin();
125-
const auto end = observers->cend();
126-
127-
observers_lock.unlock();
128-
129-
std::lock_guard lock{m_serialized_mutex};
130-
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); });
124+
std::lock_guard lock{m_serialized_mutex};
125+
for (size_t i = 0; i < size; ++i)
126+
{
127+
(*(itr++))->on_next(v);
128+
}
129+
});
131130
}
132131

133132
void on_error(const std::exception_ptr& err)
@@ -171,19 +170,18 @@ namespace rpp::subjects::details
171170
return subs;
172171
}
173172

174-
static void process_state_unsafe(const state_t& state, const auto&... actions)
173+
static auto process_state_unsafe(const state_t& state, const auto&... actions)
175174
{
176-
std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state);
175+
return std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state);
177176
}
178177

179178
shared_observers exchange_observers_under_lock_if_there(state_t&& new_val)
180179
{
181180
std::lock_guard lock{m_mutex};
182181

183-
if (!std::holds_alternative<shared_observers>(m_state))
184-
return {};
185-
186-
return std::get<shared_observers>(std::exchange(m_state, std::move(new_val)));
182+
return process_state_unsafe(m_state, [&](shared_observers observers) {
183+
m_state = std::move(new_val);
184+
return observers; }, [](auto) { return shared_observers{}; });
187185
}
188186

189187
private:

src/tests/rpp/test_subjects.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,26 @@ TEST_CASE("subject can be modified from on_next call")
169169
}
170170
}
171171

172+
TEST_CASE("subject handles addition from inside on_next properly")
173+
{
174+
rpp::subjects::publish_subject<int> subject{};
175+
176+
SUBCASE("subscribe inside on_next")
177+
{
178+
int value = {};
179+
subject.get_observable().subscribe([&subject, &value](int v) {
180+
for (int i = 0; i < 100; ++i)
181+
subject.get_observable().subscribe([](int) {});
182+
value = v;
183+
});
184+
185+
for (int i = 0; i < 100; ++i)
186+
subject.get_observer().on_next(i);
187+
188+
REQUIRE(value == 99);
189+
}
190+
}
191+
172192
TEST_CASE("publish subject caches error/completed")
173193
{
174194
auto mock = mock_observer_strategy<int>{};

0 commit comments

Comments
 (0)