Skip to content

Commit f58eea6

Browse files
authored
[ENH]: add readiness probe for garbage collector (#4662)
## Description of changes Adds a readiness probe for GC. ## Test plan _How are these changes tested?_ Verified that the service was marked ready using kubectl. ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_ n/a
1 parent aac60ef commit f58eea6

File tree

10 files changed

+63
-1
lines changed

10 files changed

+63
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
syntax = "proto3";
2+
3+
package chroma;
4+
5+
// This is a placeholder service. The garbage collector currently only exposes a health service.
6+
service GarbageCollector {}

k8s/distributed-chroma/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ apiVersion: v2
1616
name: distributed-chroma
1717
description: A helm chart for distributed Chroma
1818
type: application
19-
version: 0.1.43
19+
version: 0.1.44
2020
appVersion: "0.4.24"
2121
keywords:
2222
- chroma

k8s/distributed-chroma/templates/garbage-collector.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ spec:
4040
- name: garbage-collector
4141
image: "{{ .Values.garbageCollector.image.repository }}:{{ .Values.garbageCollector.image.tag }}"
4242
imagePullPolicy: IfNotPresent
43+
readinessProbe:
44+
grpc:
45+
port: 50055
4346
{{ if .Values.garbageCollector.resources }}
4447
resources:
4548
limits:

rust/garbage_collector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ tokio = { workspace = true }
2727
uuid = { workspace = true }
2828
futures = { workspace = true }
2929
tonic = { workspace = true }
30+
tonic-health = { workspace = true }
3031
tempfile = { workspace = true }
3132
tracing = { workspace = true }
3233
thiserror = { workspace = true }

rust/garbage_collector/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ pub(super) struct GarbageCollectorConfig {
3838
pub(super) assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig,
3939
pub(super) memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig,
4040
pub my_member_id: String,
41+
#[serde(default = "GarbageCollectorConfig::default_port")]
42+
pub port: u16,
4143
}
4244

4345
impl GarbageCollectorConfig {
@@ -61,6 +63,10 @@ impl GarbageCollectorConfig {
6163
Err(e) => panic!("Error loading config: {}", e),
6264
}
6365
}
66+
67+
fn default_port() -> u16 {
68+
50055
69+
}
6470
}
6571

6672
#[cfg(test)]

rust/garbage_collector/src/garbage_collector_component.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ mod tests {
633633
assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(),
634634
my_member_id: "test-gc".to_string(),
635635
memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig::default(),
636+
port: 50055,
636637
};
637638
let registry = Registry::new();
638639

@@ -753,6 +754,7 @@ mod tests {
753754
assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(),
754755
my_member_id: "test-gc".to_string(),
755756
memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig::default(),
757+
port: 50055,
756758
};
757759
let registry = Registry::new();
758760

rust/garbage_collector/src/lib.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ use chroma_tracing::{
66
init_global_filter_layer, init_otel_layer, init_panic_tracing_hook, init_stdout_layer,
77
init_tracing,
88
};
9+
use chroma_types::chroma_proto::garbage_collector_server::GarbageCollectorServer;
910
use config::GarbageCollectorConfig;
1011
use garbage_collector_component::GarbageCollector;
1112
use tokio::signal::unix::{signal, SignalKind};
13+
use tonic::transport::Server;
1214
use tracing::{debug, error, info};
1315

1416
mod config;
@@ -24,7 +26,20 @@ pub mod types;
2426

2527
const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH";
2628

29+
// This is a placeholder service so that we can expose a health service
30+
struct GarbageCollectorService {}
31+
32+
impl chroma_types::chroma_proto::garbage_collector_server::GarbageCollector
33+
for GarbageCollectorService
34+
{
35+
}
36+
2737
pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::error::Error>> {
38+
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
39+
health_reporter
40+
.set_not_serving::<GarbageCollectorServer<GarbageCollectorService>>()
41+
.await;
42+
2843
debug!("Loading configuration from environment");
2944
// Parse configuration. Configuration includes sysdb connection details, and
3045
// gc run details amongst others.
@@ -49,6 +64,27 @@ pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::e
4964

5065
info!("Loaded configuration successfully: {:#?}", config);
5166

67+
let addr = format!("[::]:{}", config.port)
68+
.parse()
69+
.expect("Invalid address format");
70+
let server_join_handle = tokio::spawn(async move {
71+
let server = Server::builder().add_service(health_service);
72+
server
73+
.serve_with_shutdown(addr, async {
74+
match signal(SignalKind::terminate()) {
75+
Ok(mut sigterm) => {
76+
sigterm.recv().await;
77+
tracing::info!("Received SIGTERM, shutting down gRPC server");
78+
}
79+
Err(err) => {
80+
tracing::error!("Failed to create SIGTERM handler: {err}")
81+
}
82+
}
83+
})
84+
.await
85+
.expect("Failed to start gRPC server");
86+
});
87+
5288
let registry = chroma_config::registry::Registry::new();
5389
let system = System::new();
5490

@@ -79,6 +115,10 @@ pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::e
79115
memberlist.subscribe(garbage_collector_handle.receiver());
80116
let mut memberlist_handle = system.start_component(memberlist);
81117

118+
health_reporter
119+
.set_serving::<GarbageCollectorServer<GarbageCollectorService>>()
120+
.await;
121+
82122
// Keep the service running and handle shutdown signals
83123
let mut sigterm = signal(SignalKind::terminate())?;
84124
let mut sigint = signal(SignalKind::interrupt())?;
@@ -112,6 +152,7 @@ pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::e
112152
.expect("Dispatcher should be stoppable");
113153
system.stop().await;
114154
system.join().await;
155+
let _ = server_join_handle.await;
115156

116157
info!("Shutting down garbage collector service");
117158
Ok(())

rust/types/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
66
"idl/chromadb/proto/coordinator.proto",
77
"idl/chromadb/proto/logservice.proto",
88
"idl/chromadb/proto/query_executor.proto",
9+
"idl/chromadb/proto/garbage_collector.proto",
910
];
1011

1112
// Can't use #[cfg(test)] here because a build for tests is technically a regular debug build, meaning that #[cfg(test)] is useless in build.rs.

rust/types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub use tenant::*;
4848
pub use types::*;
4949
pub use where_parsing::*;
5050

51+
#[allow(clippy::all)]
5152
pub mod chroma_proto {
5253
tonic::include_proto!("chroma");
5354
}

0 commit comments

Comments
 (0)