Skip to content

Commit c808bb5

Browse files
elelelKirk Shoop
authored andcommitted
Factor out commonalities between repeat and retry into a separate file (#363)
This reverts commit ad430c5. * Retry operator: remove old comment * Complete the error-reporting templates rollback * Rename retry/repeat common file * Fix filename in doxygen comment block
1 parent 0b098e3 commit c808bb5

File tree

3 files changed

+195
-261
lines changed

3 files changed

+195
-261
lines changed

Rx/v2/src/rxcpp/operators/rx-repeat.hpp

Lines changed: 18 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#define RXCPP_OPERATORS_RX_REPEAT_HPP
2626

2727
#include "../rx-includes.hpp"
28+
#include "rx-retry-repeat-common.hpp"
2829

2930
namespace rxcpp {
3031

@@ -44,147 +45,36 @@ using repeat_invalid_t = typename repeat_invalid<AN...>::type;
4445

4546
// Contain repeat variations in a namespace
4647
namespace repeat {
47-
// Structure to perform general repeat operations on state
48-
template <class ValuesType, class Subscriber, class T>
49-
struct state_type : public std::enable_shared_from_this<state_type<ValuesType, Subscriber, T>>,
50-
public ValuesType {
51-
52-
typedef Subscriber output_type;
53-
state_type(const ValuesType& i, const output_type& oarg)
54-
: ValuesType(i),
55-
source_lifetime(composite_subscription::empty()),
56-
out(oarg) {
48+
struct event_handlers {
49+
template <typename State>
50+
static inline void on_error(State& state, std::exception_ptr& e) {
51+
state->out.on_error(e);
5752
}
5853

59-
void do_subscribe() {
60-
auto state = this->shared_from_this();
61-
62-
state->out.remove(state->lifetime_token);
63-
state->source_lifetime.unsubscribe();
64-
65-
state->source_lifetime = composite_subscription();
66-
state->lifetime_token = state->out.add(state->source_lifetime);
67-
68-
state->source.subscribe(
69-
state->out,
70-
state->source_lifetime,
71-
// on_next
72-
[state](T t) {
73-
state->out.on_next(t);
74-
},
75-
// on_error
76-
[state](std::exception_ptr e) {
77-
state->out.on_error(e);
78-
},
79-
// on_completed
80-
[state]() {
81-
state->update();
82-
// Use specialized predicate for finite/infinte case
83-
if (state->completed_predicate()) {
84-
state->out.on_completed();
85-
} else {
86-
state->do_subscribe();
87-
}
88-
}
89-
);
90-
}
91-
92-
composite_subscription source_lifetime;
93-
output_type out;
94-
composite_subscription::weak_subscription lifetime_token;
95-
};
96-
97-
// Finite repeat case (explicitely limited with the number of times)
98-
template <class T, class Observable, class Count>
99-
struct finite : public operator_base<T> {
100-
typedef rxu::decay_t<Observable> source_type;
101-
typedef rxu::decay_t<Count> count_type;
102-
103-
struct values {
104-
values(source_type s, count_type t)
105-
: source(std::move(s)),
106-
remaining_(std::move(t)) {
107-
}
108-
109-
inline bool completed_predicate() const {
110-
// Return true if we are completed
111-
return remaining_ <= 0;
112-
}
113-
114-
inline void update() {
115-
// Decrement counter
116-
--remaining_;
117-
}
118-
119-
source_type source;
120-
121-
private:
122-
// Counter to hold number of times remaining to complete
123-
count_type remaining_;
124-
};
125-
126-
finite(source_type s, count_type t)
127-
: initial_(std::move(s), std::move(t)) {
128-
}
129-
130-
template<class Subscriber>
131-
void on_subscribe(const Subscriber& s) const {
132-
typedef state_type<values, Subscriber, T> state_t;
133-
// take a copy of the values for each subscription
134-
auto state = std::make_shared<state_t>(initial_, s);
135-
if (initial_.completed_predicate()) {
136-
// return completed
54+
template <typename State>
55+
static inline void on_completed(State& state) {
56+
// Functions update() and completed_predicate() vary between finite and infinte versions
57+
state->update();
58+
if (state->completed_predicate()) {
13759
state->out.on_completed();
13860
} else {
139-
// start the first iteration
14061
state->do_subscribe();
14162
}
14263
}
143-
144-
private:
145-
values initial_;
14664
};
14765

66+
// Finite repeat case (explicitely limited with the number of times)
67+
template <class T, class Observable, class Count>
68+
using finite = ::rxcpp::operators::detail::retry_repeat_common::finite
69+
<event_handlers, T, Observable, Count>;
70+
14871
// Infinite repeat case
14972
template <class T, class Observable>
150-
struct infinite : public operator_base<T> {
151-
typedef rxu::decay_t<Observable> source_type;
152-
153-
struct values {
154-
values(source_type s)
155-
: source(std::move(s)) {
156-
}
157-
158-
static inline bool completed_predicate() {
159-
// Infinite repeat never completes
160-
return false;
161-
}
162-
163-
static inline void update() {
164-
// Infinite repeat does not need to update state
165-
}
166-
167-
source_type source;
168-
};
169-
170-
infinite(source_type s) : initial_(std::move(s)) {
171-
}
172-
173-
template<class Subscriber>
174-
void on_subscribe(const Subscriber& s) const {
175-
typedef state_type<values, Subscriber, T> state_t;
176-
// take a copy of the values for each subscription
177-
auto state = std::make_shared<state_t>(initial_, s);
178-
// start the first iteration
179-
state->do_subscribe();
180-
}
181-
182-
private:
183-
values initial_;
184-
};
185-
}
73+
using infinite = ::rxcpp::operators::detail::retry_repeat_common::infinite
74+
<event_handlers, T, Observable>;
18675

18776
}
77+
} // detail
18878

18979
/*! @copydoc rx-repeat.hpp
19080
*/
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
#pragma once
2+
3+
/*! \file rx-retry-repeat-common.hpp
4+
5+
\brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp
6+
7+
*/
8+
9+
#include "../rx-includes.hpp"
10+
11+
namespace rxcpp {
12+
namespace operators {
13+
namespace detail {
14+
15+
namespace retry_repeat_common {
16+
// Structure to perform general retry/repeat operations on state
17+
template <class Values, class Subscriber, class EventHandlers, class T>
18+
struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
19+
public Values {
20+
21+
typedef Subscriber output_type;
22+
state_type(const Values& i, const output_type& oarg)
23+
: Values(i),
24+
source_lifetime(composite_subscription::empty()),
25+
out(oarg) {
26+
}
27+
28+
void do_subscribe() {
29+
auto state = this->shared_from_this();
30+
31+
state->out.remove(state->lifetime_token);
32+
state->source_lifetime.unsubscribe();
33+
34+
state->source_lifetime = composite_subscription();
35+
state->lifetime_token = state->out.add(state->source_lifetime);
36+
37+
state->source.subscribe(
38+
state->out,
39+
state->source_lifetime,
40+
// on_next
41+
[state](T t) {
42+
state->out.on_next(t);
43+
},
44+
// on_error
45+
[state](std::exception_ptr e) {
46+
EventHandlers::on_error(state, e);
47+
},
48+
// on_completed
49+
[state]() {
50+
EventHandlers::on_completed(state);
51+
}
52+
);
53+
}
54+
55+
composite_subscription source_lifetime;
56+
output_type out;
57+
composite_subscription::weak_subscription lifetime_token;
58+
};
59+
60+
// Finite case (explicitely limited with the number of times)
61+
template <class EventHandlers, class T, class Observable, class Count>
62+
struct finite : public operator_base<T> {
63+
typedef rxu::decay_t<Observable> source_type;
64+
typedef rxu::decay_t<Count> count_type;
65+
66+
struct values {
67+
values(source_type s, count_type t)
68+
: source(std::move(s)),
69+
remaining_(std::move(t)) {
70+
}
71+
72+
inline bool completed_predicate() const {
73+
// Return true if we are completed
74+
return remaining_ <= 0;
75+
}
76+
77+
inline void update() {
78+
// Decrement counter
79+
--remaining_;
80+
}
81+
82+
source_type source;
83+
84+
private:
85+
// Counter to hold number of times remaining to complete
86+
count_type remaining_;
87+
};
88+
89+
finite(source_type s, count_type t)
90+
: initial_(std::move(s), std::move(t)) {
91+
}
92+
93+
template<class Subscriber>
94+
void on_subscribe(const Subscriber& s) const {
95+
typedef state_type<values, Subscriber, EventHandlers, T> state_t;
96+
// take a copy of the values for each subscription
97+
auto state = std::make_shared<state_t>(initial_, s);
98+
if (initial_.completed_predicate()) {
99+
// return completed
100+
state->out.on_completed();
101+
} else {
102+
// start the first iteration
103+
state->do_subscribe();
104+
}
105+
}
106+
107+
private:
108+
values initial_;
109+
};
110+
111+
// Infinite case
112+
template <class EventHandlers, class T, class Observable>
113+
struct infinite : public operator_base<T> {
114+
typedef rxu::decay_t<Observable> source_type;
115+
116+
struct values {
117+
values(source_type s)
118+
: source(std::move(s)) {
119+
}
120+
121+
static inline bool completed_predicate() {
122+
// Infinite never completes
123+
return false;
124+
}
125+
126+
static inline void update() {
127+
// Infinite does not need to update state
128+
}
129+
130+
source_type source;
131+
};
132+
133+
infinite(source_type s) : initial_(std::move(s)) {
134+
}
135+
136+
template<class Subscriber>
137+
void on_subscribe(const Subscriber& s) const {
138+
typedef state_type<values, Subscriber, EventHandlers, T> state_t;
139+
// take a copy of the values for each subscription
140+
auto state = std::make_shared<state_t>(initial_, s);
141+
// start the first iteration
142+
state->do_subscribe();
143+
}
144+
145+
private:
146+
values initial_;
147+
};
148+
149+
150+
}
151+
}
152+
}
153+
}

0 commit comments

Comments
 (0)