Skip to content

Conversation

@duyquang6
Copy link
Contributor

@duyquang6 duyquang6 commented Oct 12, 2025

Implement #768
This PR only cover interface part, concrete implementation of ready for each input/output will be implement next

Summary by CodeRabbit

  • New Features

    • More accurate readiness: computed from all stream components rather than a fixed flag.
    • Inputs and outputs can optionally report readiness, improving health diagnostics.
    • Health and liveness endpoints use a unified engine state; liveness reports alive when reachable.
  • Refactor

    • Health check server starts after streams are initialized to ensure reliable readiness.
    • Internal restructuring to support richer health reporting without breaking existing behavior.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 12, 2025

Walkthrough

Introduces readiness health checks across engine, input, and output components. Adds accessors on streams to expose components. Engine health server now starts after streams are constructed and computes readiness by querying each stream’s input/output (and optional error output) via new async check_ready hooks.

Changes

Cohort / File(s) Summary
Engine health API and control flow
crates/arkflow-core/src/engine/mod.rs
Adds StreamComponents and EngineApiState; removes HealthState.is_ready; health/readiness/liveness endpoints now use EngineApiState; readiness iterates components and short-circuits on failure; health server starts after streams are built; Engine::run constructs components and passes them to the server.
Trait readiness API
crates/arkflow-core/src/input/mod.rs, crates/arkflow-core/src/output/mod.rs
Adds async fn check_ready(&self) -> Result<(), Error> with default Ok(()) to Input and Output traits, enabling optional readiness checks.
Stream component accessors
crates/arkflow-core/src/stream/mod.rs
Adds get_input, get_output, and get_error_output to expose underlying components for health checks.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant Engine
  participant Streams
  participant HealthServer as Health Server
  participant Components as StreamComponents
  participant Input as Input
  participant Output as Output
  participant ErrOut as ErrorOutput

  Note over Engine,Streams: Startup
  Engine->>Streams: build all streams
  Streams-->>Engine: constructed streams
  Engine->>Components: collect get_input/get_output/get_error_output
  Engine->>HealthServer: start with EngineApiState{health_state, components}

  Note over Client,HealthServer: Health Endpoints
  Client->>HealthServer: GET /readiness
  HealthServer->>Components: iterate components
  loop for each stream
    Components->>Input: check_ready()
    alt input ok
      Components->>Output: check_ready()
      alt error output exists
        Components->>ErrOut: check_ready()
      end
    else input not ready
      Note over HealthServer: short-circuit not ready
    end
  end
  HealthServer-->>Client: 200 ready or 503 not ready

  Client->>HealthServer: GET /health
  HealthServer-->>Client: { running: health_state.is_running }

  Client->>HealthServer: GET /liveness
  HealthServer-->>Client: 200 alive
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I thump my paws: checks all align,
Streams now whisper, “I’m just fine.”
Inputs, outputs, ready call—
Health hops through the server hall.
Green carrots glow, errors fade—
A hare-approved rollout, well-made. 🥕🐇

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly describes the addition of a readiness check feature and aligns with the conventional commit style, accurately reflecting the primary change of exposing and computing readiness across components.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 41ff461 and 7d45e52.

📒 Files selected for processing (4)
  • crates/arkflow-core/src/engine/mod.rs (8 hunks)
  • crates/arkflow-core/src/input/mod.rs (1 hunks)
  • crates/arkflow-core/src/output/mod.rs (1 hunks)
  • crates/arkflow-core/src/stream/mod.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
crates/arkflow-core/src/output/mod.rs (1)
crates/arkflow-core/src/input/mod.rs (1)
  • check_ready (56-58)
crates/arkflow-core/src/engine/mod.rs (1)
crates/arkflow-core/src/stream/mod.rs (2)
  • output (348-388)
  • new (53-73)
crates/arkflow-core/src/input/mod.rs (1)
crates/arkflow-core/src/output/mod.rs (1)
  • check_ready (42-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build
  • GitHub Check: build

Comment on lines +193 to +209
let mut is_ready = true;

for component in state.components.iter() {
if !component.input.check_ready().await.is_ok()
|| !component.output.check_ready().await.is_ok()
{
is_ready = false;
}
if let Some(error_output) = component.error_output.as_ref() {
if !error_output.check_ready().await.is_ok() {
is_ready = false;
}
}
if !is_ready {
break;
}
}
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Readiness must wait for the engine to be running.

With the new interface-only checks, every component currently reports Ok(()), so this handler now returns “ready” even before any stream has started (or after a fatal startup failure). That breaks the readiness signal consumers rely on for gating traffic. Gate the component loop behind health_state.is_running so we only declare readiness once the engine has actually transitioned to running.

-        let mut is_ready = true;
-
-        for component in state.components.iter() {
-            if !component.input.check_ready().await.is_ok()
-                || !component.output.check_ready().await.is_ok()
-            {
-                is_ready = false;
-            }
-            if let Some(error_output) = component.error_output.as_ref() {
-                if !error_output.check_ready().await.is_ok() {
-                    is_ready = false;
-                }
-            }
-            if !is_ready {
-                break;
-            }
-        }
+        let mut is_ready = state.health_state.is_running.load(Ordering::SeqCst);
+
+        if is_ready {
+            for component in state.components.iter() {
+                if !component.input.check_ready().await.is_ok()
+                    || !component.output.check_ready().await.is_ok()
+                {
+                    is_ready = false;
+                }
+                if let Some(error_output) = component.error_output.as_ref() {
+                    if !error_output.check_ready().await.is_ok() {
+                        is_ready = false;
+                    }
+                }
+                if !is_ready {
+                    break;
+                }
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut is_ready = true;
for component in state.components.iter() {
if !component.input.check_ready().await.is_ok()
|| !component.output.check_ready().await.is_ok()
{
is_ready = false;
}
if let Some(error_output) = component.error_output.as_ref() {
if !error_output.check_ready().await.is_ok() {
is_ready = false;
}
}
if !is_ready {
break;
}
}
let mut is_ready = state.health_state.is_running.load(Ordering::SeqCst);
if is_ready {
for component in state.components.iter() {
if !component.input.check_ready().await.is_ok()
|| !component.output.check_ready().await.is_ok()
{
is_ready = false;
}
if let Some(error_output) = component.error_output.as_ref() {
if !error_output.check_ready().await.is_ok() {
is_ready = false;
}
}
if !is_ready {
break;
}
}
}
🤖 Prompt for AI Agents
crates/arkflow-core/src/engine/mod.rs around lines 193 to 209: the readiness
check currently iterates components and returns ready even when the engine
hasn't started because interface-only checks all return Ok; gate the component
readiness loop behind the engine running flag by first checking
health_state.is_running (or the equivalent accessor) and only perform the
per-component check when running is true — if not running, set readiness to not
ready (or return early) so readiness is only true after health_state.is_running
is true and all component checks succeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readiness just checking input and output is ready to serve request or not, it not related to is_running check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duyquang6, thank you for the clarification. I understand now—the readiness check is specifically focused on whether the individual components (inputs and outputs) are ready to accept and process requests, which is independent of the engine's running state.

Your design makes sense: the readiness endpoint verifies component-level readiness (e.g., connection pools, message broker connections), while is_running tracks the engine's execution state. These are indeed separate concerns, and it's valid for components to report readiness even before stream processing begins.

I appreciate you explaining the design intent. This approach aligns with common patterns where readiness indicates infrastructure availability rather than active processing state.

@chenquan chenquan linked an issue Oct 12, 2025 that may be closed by this pull request
@chenquan chenquan self-assigned this Oct 12, 2025
@chenquan
Copy link
Collaborator

I need more time to think.

@chenquan
Copy link
Collaborator

@duyquang6
Hello, thank you very much for submitting your PR, I thought deeply about it, multiple streams should be independent of each other. The current implementation breaks the original mechanism, when there are multiple streams, as soon as one stream fails to start, all streams will not work properly.

@chenquan chenquan added the hold Pause, for some reason label Oct 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

hold Pause, for some reason

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature request: Implement /ready endpoint for readiness checks

2 participants