Skip to content

Commit 8290f92

Browse files
author
Kirk Shoop
committed
fix window_with_time_or_count
reported in #277
1 parent 4ab756b commit 8290f92

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ struct window_with_time_or_count
135135

136136
void on_next(T v) const {
137137
auto localState = state;
138-
auto work = [v, localState](const rxsc::schedulable&){
138+
auto work = [v, localState](const rxsc::schedulable& self){
139139
localState->subj.get_subscriber().on_next(v);
140140
if (++localState->cursor == localState->count) {
141-
release_window(localState->subj_id, localState->worker.now(), localState);
141+
release_window(localState->subj_id, localState->worker.now(), localState)(self);
142142
}
143143
};
144144
auto selectedWork = on_exception(

Rx/v2/test/operators/buffer.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,3 +1172,53 @@ SCENARIO("buffer with time or count, only time triggered", "[buffer_with_time_or
11721172
}
11731173
}
11741174
}
1175+
1176+
SCENARIO("buffer with time or count, only count triggered", "[buffer_with_time_or_count][operators]"){
1177+
GIVEN("1 hot observable of ints."){
1178+
auto sc = rxsc::make_test();
1179+
auto so = rx::synchronize_in_one_worker(sc);
1180+
auto w = sc.create_worker();
1181+
const rxsc::test::messages<int> on;
1182+
const rxsc::test::messages<std::vector<int>> v_on;
1183+
1184+
auto xs = sc.make_hot_observable({
1185+
on.next(205, 1),
1186+
on.next(305, 2),
1187+
on.next(505, 3),
1188+
on.next(605, 4),
1189+
on.next(610, 5),
1190+
on.completed(850)
1191+
});
1192+
WHEN("group ints on intervals"){
1193+
using namespace std::chrono;
1194+
1195+
auto res = w.start(
1196+
[&]() {
1197+
return xs
1198+
.buffer_with_time_or_count(milliseconds(370), 2, so)
1199+
// forget type to workaround lambda deduction bug on msvc 2013
1200+
.as_dynamic();
1201+
}
1202+
);
1203+
1204+
THEN("the output contains groups of ints"){
1205+
auto required = rxu::to_vector({
1206+
v_on.next(306, rxu::to_vector({ 1, 2 })),
1207+
v_on.next(606, rxu::to_vector({ 3, 4 })),
1208+
v_on.next(851, rxu::to_vector({ 5 })),
1209+
v_on.completed(851)
1210+
});
1211+
auto actual = res.get_observer().messages();
1212+
REQUIRE(required == actual);
1213+
}
1214+
1215+
THEN("there was one subscription and one unsubscription to the xs"){
1216+
auto required = rxu::to_vector({
1217+
on.subscribe(200, 850)
1218+
});
1219+
auto actual = xs.subscriptions();
1220+
REQUIRE(required == actual);
1221+
}
1222+
}
1223+
}
1224+
}

Rx/v2/test/operators/window.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "../test.h"
22

3+
#include <rxcpp/operators/rx-reduce.hpp>
4+
35
SCENARIO("window count, basic", "[window][operators]"){
46
GIVEN("1 hot observable of ints."){
57
auto sc = rxsc::make_test();
@@ -979,3 +981,58 @@ SCENARIO("window with time or count, only time triggered", "[window_with_time_or
979981
}
980982
}
981983
}
984+
985+
SCENARIO("window with time or count, only count triggered", "[window_with_time_or_count][operators]"){
986+
GIVEN("1 hot observable of ints."){
987+
auto sc = rxsc::make_test();
988+
auto so = rx::synchronize_in_one_worker(sc);
989+
auto w = sc.create_worker();
990+
const rxsc::test::messages<int> on;
991+
const rxsc::test::messages<rx::observable<int>> o_on;
992+
993+
auto xs = sc.make_hot_observable({
994+
on.next(205, 1),
995+
on.next(305, 2),
996+
on.next(505, 3),
997+
on.next(605, 4),
998+
on.next(610, 5),
999+
on.completed(850)
1000+
});
1001+
1002+
WHEN("group each int with the next 2 ints"){
1003+
using namespace std::chrono;
1004+
1005+
auto res = w.start(
1006+
[&]() {
1007+
return xs
1008+
.window_with_time_or_count(milliseconds(370), 2, so)
1009+
.map([](rx::observable<int> w){
1010+
return w.count();
1011+
})
1012+
.merge()
1013+
// forget type to workaround lambda deduction bug on msvc 2013
1014+
.as_dynamic();
1015+
}
1016+
);
1017+
1018+
THEN("the output contains merged groups of ints"){
1019+
auto required = rxu::to_vector({
1020+
on.next(306, 2),
1021+
on.next(606, 2),
1022+
on.next(851, 1),
1023+
on.completed(851)
1024+
});
1025+
auto actual = res.get_observer().messages();
1026+
REQUIRE(required == actual);
1027+
}
1028+
1029+
THEN("there was one subscription and one unsubscription to the observable"){
1030+
auto required = rxu::to_vector({
1031+
o_on.subscribe(200, 850)
1032+
});
1033+
auto actual = xs.subscriptions();
1034+
REQUIRE(required == actual);
1035+
}
1036+
}
1037+
}
1038+
}

0 commit comments

Comments
 (0)