Skip to content

Commit 56f7f90

Browse files
committed
Fix possible race confition when adding, then removing remote config services
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent b3b2852 commit 56f7f90

File tree

1 file changed

+51
-30
lines changed

1 file changed

+51
-30
lines changed

remote-config/src/fetch/multitarget.rs

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -168,42 +168,63 @@ where
168168
'service_handling: {
169169
'drop_service: {
170170
if let Some(known_service) = services.get_mut(target) {
171-
known_service.refcount = if known_service.refcount == 1 {
172-
known_service.runtimes.remove(runtime_id);
173-
let mut status = known_service.status.lock().unwrap();
174-
*status = match *status {
175-
KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really
176-
KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt(
177-
Instant::now() + Duration::from_secs(3666),
178-
),
179-
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
180-
unreachable!()
171+
known_service.refcount = match known_service.refcount {
172+
0 => {
173+
// Handle the possible race condition where the service gets added AND
174+
// removed while in Removing state.
175+
let mut status = known_service.status.lock().unwrap();
176+
match *status {
177+
KnownTargetStatus::Removing(ref future) => {
178+
let future = future.clone();
179+
let runtime_id = runtime_id.to_string();
180+
let this = self.clone();
181+
tokio::spawn(async move {
182+
future.await;
183+
this.remove_target(runtime_id.as_str(), target);
184+
});
185+
}
186+
_ => unreachable!(),
181187
}
182-
};
183-
// We've marked it Alive so that the Pending check in start_fetcher() will
184-
// fail
185-
if matches!(*status, KnownTargetStatus::Alive) {
186-
break 'drop_service;
188+
0
187189
}
188-
0
189-
} else {
190-
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
191-
'changed_rt_id: {
192-
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
193-
if runtime.targets.len() == 1
194-
&& runtime.targets.contains_key(target)
195-
{
196-
*known_service.fetcher.runtime_id.lock().unwrap() =
197-
id.to_string();
198-
break 'changed_rt_id;
190+
1 => {
191+
known_service.runtimes.remove(runtime_id);
192+
let mut status = known_service.status.lock().unwrap();
193+
*status = match *status {
194+
KnownTargetStatus::Pending => KnownTargetStatus::Alive, /* not really */
195+
KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt(
196+
Instant::now() + Duration::from_secs(3666),
197+
),
198+
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
199+
unreachable!()
200+
}
201+
};
202+
// We've marked it Alive so that the Pending check in start_fetcher()
203+
// will fail
204+
if matches!(*status, KnownTargetStatus::Alive) {
205+
break 'drop_service;
206+
}
207+
0
208+
}
209+
_ => {
210+
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
211+
'changed_rt_id: {
212+
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
213+
if runtime.targets.len() == 1
214+
&& runtime.targets.contains_key(target)
215+
{
216+
*known_service.fetcher.runtime_id.lock().unwrap() =
217+
id.to_string();
218+
break 'changed_rt_id;
219+
}
199220
}
221+
known_service.synthetic_id = true;
222+
*known_service.fetcher.runtime_id.lock().unwrap() =
223+
Self::generate_synthetic_id();
200224
}
201-
known_service.synthetic_id = true;
202-
*known_service.fetcher.runtime_id.lock().unwrap() =
203-
Self::generate_synthetic_id();
204225
}
226+
known_service.refcount - 1
205227
}
206-
known_service.refcount - 1
207228
};
208229
break 'service_handling;
209230
}

0 commit comments

Comments
 (0)