-
Couldn't load subscription status.
- Fork 39
feat: add ready check #868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughIntroduces readiness health checks across engine, input, and output components. Adds accessors on streams to expose components. Engine health server now starts after streams are constructed and computes readiness by querying each stream’s input/output (and optional error output) via new async check_ready hooks. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant Engine
participant Streams
participant HealthServer as Health Server
participant Components as StreamComponents
participant Input as Input
participant Output as Output
participant ErrOut as ErrorOutput
Note over Engine,Streams: Startup
Engine->>Streams: build all streams
Streams-->>Engine: constructed streams
Engine->>Components: collect get_input/get_output/get_error_output
Engine->>HealthServer: start with EngineApiState{health_state, components}
Note over Client,HealthServer: Health Endpoints
Client->>HealthServer: GET /readiness
HealthServer->>Components: iterate components
loop for each stream
Components->>Input: check_ready()
alt input ok
Components->>Output: check_ready()
alt error output exists
Components->>ErrOut: check_ready()
end
else input not ready
Note over HealthServer: short-circuit not ready
end
end
HealthServer-->>Client: 200 ready or 503 not ready
Client->>HealthServer: GET /health
HealthServer-->>Client: { running: health_state.is_running }
Client->>HealthServer: GET /liveness
HealthServer-->>Client: 200 alive
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
fd1d5cb to
7d45e52
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
crates/arkflow-core/src/engine/mod.rs(8 hunks)crates/arkflow-core/src/input/mod.rs(1 hunks)crates/arkflow-core/src/output/mod.rs(1 hunks)crates/arkflow-core/src/stream/mod.rs(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
crates/arkflow-core/src/output/mod.rs (1)
crates/arkflow-core/src/input/mod.rs (1)
check_ready(56-58)
crates/arkflow-core/src/engine/mod.rs (1)
crates/arkflow-core/src/stream/mod.rs (2)
output(348-388)new(53-73)
crates/arkflow-core/src/input/mod.rs (1)
crates/arkflow-core/src/output/mod.rs (1)
check_ready(42-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build
- GitHub Check: build
| let mut is_ready = true; | ||
|
|
||
| for component in state.components.iter() { | ||
| if !component.input.check_ready().await.is_ok() | ||
| || !component.output.check_ready().await.is_ok() | ||
| { | ||
| is_ready = false; | ||
| } | ||
| if let Some(error_output) = component.error_output.as_ref() { | ||
| if !error_output.check_ready().await.is_ok() { | ||
| is_ready = false; | ||
| } | ||
| } | ||
| if !is_ready { | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Readiness must wait for the engine to be running.
With the new interface-only checks, every component currently reports Ok(()), so this handler now returns “ready” even before any stream has started (or after a fatal startup failure). That breaks the readiness signal consumers rely on for gating traffic. Gate the component loop behind health_state.is_running so we only declare readiness once the engine has actually transitioned to running.
- let mut is_ready = true;
-
- for component in state.components.iter() {
- if !component.input.check_ready().await.is_ok()
- || !component.output.check_ready().await.is_ok()
- {
- is_ready = false;
- }
- if let Some(error_output) = component.error_output.as_ref() {
- if !error_output.check_ready().await.is_ok() {
- is_ready = false;
- }
- }
- if !is_ready {
- break;
- }
- }
+ let mut is_ready = state.health_state.is_running.load(Ordering::SeqCst);
+
+ if is_ready {
+ for component in state.components.iter() {
+ if !component.input.check_ready().await.is_ok()
+ || !component.output.check_ready().await.is_ok()
+ {
+ is_ready = false;
+ }
+ if let Some(error_output) = component.error_output.as_ref() {
+ if !error_output.check_ready().await.is_ok() {
+ is_ready = false;
+ }
+ }
+ if !is_ready {
+ break;
+ }
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut is_ready = true; | |
| for component in state.components.iter() { | |
| if !component.input.check_ready().await.is_ok() | |
| || !component.output.check_ready().await.is_ok() | |
| { | |
| is_ready = false; | |
| } | |
| if let Some(error_output) = component.error_output.as_ref() { | |
| if !error_output.check_ready().await.is_ok() { | |
| is_ready = false; | |
| } | |
| } | |
| if !is_ready { | |
| break; | |
| } | |
| } | |
| let mut is_ready = state.health_state.is_running.load(Ordering::SeqCst); | |
| if is_ready { | |
| for component in state.components.iter() { | |
| if !component.input.check_ready().await.is_ok() | |
| || !component.output.check_ready().await.is_ok() | |
| { | |
| is_ready = false; | |
| } | |
| if let Some(error_output) = component.error_output.as_ref() { | |
| if !error_output.check_ready().await.is_ok() { | |
| is_ready = false; | |
| } | |
| } | |
| if !is_ready { | |
| break; | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
crates/arkflow-core/src/engine/mod.rs around lines 193 to 209: the readiness
check currently iterates components and returns ready even when the engine
hasn't started because interface-only checks all return Ok; gate the component
readiness loop behind the engine running flag by first checking
health_state.is_running (or the equivalent accessor) and only perform the
per-component check when running is true — if not running, set readiness to not
ready (or return early) so readiness is only true after health_state.is_running
is true and all component checks succeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readiness just checking input and output is ready to serve request or not, it not related to is_running check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@duyquang6, thank you for the clarification. I understand now—the readiness check is specifically focused on whether the individual components (inputs and outputs) are ready to accept and process requests, which is independent of the engine's running state.
Your design makes sense: the readiness endpoint verifies component-level readiness (e.g., connection pools, message broker connections), while is_running tracks the engine's execution state. These are indeed separate concerns, and it's valid for components to report readiness even before stream processing begins.
I appreciate you explaining the design intent. This approach aligns with common patterns where readiness indicates infrastructure availability rather than active processing state.
|
I need more time to think. |
|
@duyquang6 |
Implement #768
This PR only cover interface part, concrete implementation of
readyfor each input/output will be implement nextSummary by CodeRabbit
New Features
Refactor