@@ -68,65 +68,79 @@ struct observe_on
68
68
, destination(std::move(d))
69
69
{
70
70
}
71
+
72
+ void finish (std::unique_lock<std::mutex>& guard, typename mode::type end) const {
73
+ if (!guard.owns_lock ()) {
74
+ abort ();
75
+ }
76
+ if (current == mode::Errored || current == mode::Disposed) {return ;}
77
+ current = end;
78
+ queue_type fill_expired;
79
+ swap (fill_expired, fill_queue);
80
+ queue_type drain_expired;
81
+ swap (drain_expired, drain_queue);
82
+ RXCPP_UNWIND_AUTO ([&](){guard.lock ();});
83
+ guard.unlock ();
84
+ lifetime.unsubscribe ();
85
+ destination.unsubscribe ();
86
+ }
71
87
72
88
void ensure_processing (std::unique_lock<std::mutex>& guard) const {
73
89
if (!guard.owns_lock ()) {
74
90
abort ();
75
91
}
76
92
if (current == mode::Empty) {
77
93
current = mode::Processing;
94
+
95
+ if (!lifetime.is_subscribed () && fill_queue.empty () && drain_queue.empty ()) {
96
+ finish (guard, mode::Disposed);
97
+ }
78
98
79
99
auto keepAlive = this ->shared_from_this ();
80
-
100
+
81
101
auto drain = [keepAlive, this ](const rxsc::schedulable& self){
82
102
using std::swap;
83
103
try {
84
- if (drain_queue.empty () || !destination.is_subscribed ()) {
85
- std::unique_lock<std::mutex> guard (lock);
86
- if (!destination.is_subscribed () ||
87
- (!lifetime.is_subscribed () && fill_queue.empty () && drain_queue.empty ())) {
88
- current = mode::Disposed;
89
- queue_type expired;
90
- swap (expired, fill_queue);
91
- guard.unlock ();
92
- lifetime.unsubscribe ();
93
- destination.unsubscribe ();
94
- return ;
95
- }
96
- if (drain_queue.empty ()) {
97
- if (fill_queue.empty ()) {
98
- current = mode::Empty;
104
+ for (;;) {
105
+ if (drain_queue.empty () || !destination.is_subscribed ()) {
106
+ std::unique_lock<std::mutex> guard (lock);
107
+ if (!destination.is_subscribed () ||
108
+ (!lifetime.is_subscribed () && fill_queue.empty () && drain_queue.empty ())) {
109
+ finish (guard, mode::Disposed);
99
110
return ;
100
111
}
101
- swap (fill_queue, drain_queue);
112
+ if (drain_queue.empty ()) {
113
+ if (fill_queue.empty ()) {
114
+ current = mode::Empty;
115
+ return ;
116
+ }
117
+ swap (fill_queue, drain_queue);
118
+ }
102
119
}
120
+ auto notification = std::move (drain_queue.front ());
121
+ drain_queue.pop_front ();
122
+ notification->accept (destination);
123
+ std::unique_lock<std::mutex> guard (lock);
124
+ self ();
125
+ if (lifetime.is_subscribed ()) break ;
103
126
}
104
- auto notification = std::move (drain_queue.front ());
105
- drain_queue.pop_front ();
106
- notification->accept (destination);
107
- self ();
108
127
} catch (...) {
109
128
destination.on_error (std::current_exception ());
110
129
std::unique_lock<std::mutex> guard (lock);
111
- current = mode::Errored;
112
- queue_type expired;
113
- swap (expired, fill_queue);
130
+ finish (guard, mode::Errored);
114
131
}
115
132
};
116
133
117
134
auto selectedDrain = on_exception (
118
135
[&](){return coordinator.act (drain);},
119
136
destination);
120
137
if (selectedDrain.empty ()) {
121
- current = mode::Errored;
122
- using std::swap;
123
- queue_type expired;
124
- swap (expired, fill_queue);
138
+ finish (guard, mode::Errored);
125
139
return ;
126
140
}
127
141
128
142
auto processor = coordinator.get_worker ();
129
-
143
+
130
144
RXCPP_UNWIND_AUTO ([&](){guard.lock ();});
131
145
guard.unlock ();
132
146
@@ -143,16 +157,19 @@ struct observe_on
143
157
144
158
void on_next (source_value_type v) const {
145
159
std::unique_lock<std::mutex> guard (state->lock );
160
+ if (state->current == mode::Errored || state->current == mode::Disposed) { return ; }
146
161
state->fill_queue .push_back (notification_type::on_next (std::move (v)));
147
162
state->ensure_processing (guard);
148
163
}
149
164
void on_error (std::exception_ptr e) const {
150
165
std::unique_lock<std::mutex> guard (state->lock );
166
+ if (state->current == mode::Errored || state->current == mode::Disposed) { return ; }
151
167
state->fill_queue .push_back (notification_type::on_error (e));
152
168
state->ensure_processing (guard);
153
169
}
154
170
void on_completed () const {
155
171
std::unique_lock<std::mutex> guard (state->lock );
172
+ if (state->current == mode::Errored || state->current == mode::Disposed) { return ; }
156
173
state->fill_queue .push_back (notification_type::on_completed ());
157
174
state->ensure_processing (guard);
158
175
}
@@ -163,7 +180,7 @@ struct observe_on
163
180
164
181
this_type o (d, std::move (coor), cs);
165
182
auto keepAlive = o.state ;
166
- cs.add ([keepAlive ](){
183
+ cs.add ([= ](){
167
184
std::unique_lock<std::mutex> guard (keepAlive->lock );
168
185
keepAlive->ensure_processing (guard);
169
186
});
@@ -262,6 +279,11 @@ class observe_on_one_worker : public coordination_base
262
279
}
263
280
};
264
281
282
+ inline observe_on_one_worker observe_on_run_loop (const rxsc::run_loop& rl) {
283
+ static observe_on_one_worker r (rxsc::make_run_loop (rl));
284
+ return r;
285
+ }
286
+
265
287
inline observe_on_one_worker observe_on_event_loop () {
266
288
static observe_on_one_worker r (rxsc::make_event_loop ());
267
289
return r;
0 commit comments