Skip to content

Commit e9319df

Browse files
committed
Fix possible race confition when adding, then removing remote config services
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent b3b2852 commit e9319df

File tree

1 file changed

+52
-30
lines changed

1 file changed

+52
-30
lines changed

remote-config/src/fetch/multitarget.rs

Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -168,42 +168,64 @@ where
168168
'service_handling: {
169169
'drop_service: {
170170
if let Some(known_service) = services.get_mut(target) {
171-
known_service.refcount = if known_service.refcount == 1 {
172-
known_service.runtimes.remove(runtime_id);
173-
let mut status = known_service.status.lock().unwrap();
174-
*status = match *status {
175-
KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really
176-
KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt(
177-
Instant::now() + Duration::from_secs(3666),
178-
),
179-
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
180-
unreachable!()
171+
known_service.refcount = match known_service.refcount {
172+
0 => {
173+
// Handle the possible race condition where the service gets added AND
174+
// removed while in Removing state.
175+
let status = known_service.status.lock().unwrap();
176+
match *status {
177+
KnownTargetStatus::Removing(ref future) => {
178+
let future = future.clone();
179+
let runtime_id = runtime_id.to_string();
180+
let this = self.clone();
181+
let target = target.clone();
182+
tokio::spawn(async move {
183+
future.await;
184+
this.remove_target(runtime_id.as_str(), &target);
185+
});
186+
}
187+
_ => unreachable!(),
181188
}
182-
};
183-
// We've marked it Alive so that the Pending check in start_fetcher() will
184-
// fail
185-
if matches!(*status, KnownTargetStatus::Alive) {
186-
break 'drop_service;
189+
0
187190
}
188-
0
189-
} else {
190-
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
191-
'changed_rt_id: {
192-
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
193-
if runtime.targets.len() == 1
194-
&& runtime.targets.contains_key(target)
195-
{
196-
*known_service.fetcher.runtime_id.lock().unwrap() =
197-
id.to_string();
198-
break 'changed_rt_id;
191+
1 => {
192+
known_service.runtimes.remove(runtime_id);
193+
let mut status = known_service.status.lock().unwrap();
194+
*status = match *status {
195+
KnownTargetStatus::Pending => KnownTargetStatus::Alive, /* not really */
196+
KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt(
197+
Instant::now() + Duration::from_secs(3666),
198+
),
199+
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
200+
unreachable!()
201+
}
202+
};
203+
// We've marked it Alive so that the Pending check in start_fetcher()
204+
// will fail
205+
if matches!(*status, KnownTargetStatus::Alive) {
206+
break 'drop_service;
207+
}
208+
0
209+
}
210+
_ => {
211+
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
212+
'changed_rt_id: {
213+
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
214+
if runtime.targets.len() == 1
215+
&& runtime.targets.contains_key(target)
216+
{
217+
*known_service.fetcher.runtime_id.lock().unwrap() =
218+
id.to_string();
219+
break 'changed_rt_id;
220+
}
199221
}
222+
known_service.synthetic_id = true;
223+
*known_service.fetcher.runtime_id.lock().unwrap() =
224+
Self::generate_synthetic_id();
200225
}
201-
known_service.synthetic_id = true;
202-
*known_service.fetcher.runtime_id.lock().unwrap() =
203-
Self::generate_synthetic_id();
204226
}
227+
known_service.refcount - 1
205228
}
206-
known_service.refcount - 1
207229
};
208230
break 'service_handling;
209231
}

0 commit comments

Comments
 (0)