Skip to content

Conversation

@chenquan
Copy link
Collaborator

@chenquan chenquan commented Aug 27, 2025

Summary by CodeRabbit

  • New Features
    • Robust state management: memory/S3/hybrid backends, per-batch metadata, checkpointing, manual recovery, exactly-once processing, barrier/transaction coordination, two‑phase commit outputs, S3 persistence with performance optimizations, metrics/alerting, and EngineBuilder + stream-level state wiring.
  • Documentation
    • Comprehensive state management guides, architecture notes, configs, and implementation summaries.
  • Examples
    • New end-to-end examples (word count, session/window, stateful pipeline, run_stateful_example, verification).
  • Tests
    • Extensive unit & integration tests for state, checkpointing, transactions, and monitoring.
  • Bug Fixes
    • Safer CLI config handling (error instead of panic).

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 27, 2025

Walkthrough

Adds a comprehensive state-management subsystem to arkflow-core: per-batch metadata and transaction context, in-memory and S3-backed state stores with checkpoint coordination (2PC), performance optimizations, Prometheus monitoring, EngineBuilder/stream integration, examples, tests, and Cargo dependency updates. Several new public types/functions are introduced; no breaking removals.

Changes

Cohort / File(s) Summary
Documentation
CLAUDE.md, README-STATE-MANAGEMENT.md, docs/state-management-guide.md, docs/state-management-implementation-summary.md, docs/state-management.md, docs/STATE_MANAGEMENT.md
New docs and guides describing architecture, configuration, examples, migration notes, implementation summary and developer guidance; documentation-only changes.
Manifests
crates/arkflow-core/Cargo.toml
Adds dependencies: uuid, object_store (aws feature), futures-util (workspace), async-compression (tokio, zstd), lru, bytes, prometheus, parking_lot (num_cpus preserved).
Crate root / Core API
crates/arkflow-core/src/lib.rs
Exposes pub mod engine_builder; pub mod state;, extends Error with ObjectStore(String), adds From conversions (object_store, prometheus), and extends MessageBatch API with metadata/transaction accessors and checkpoint detection.
Configuration
crates/arkflow-core/src/config.rs
Adds StateManagementConfig, StateBackendType, S3StateBackendConfig, StreamStateConfig, serde defaults, and new state_management field on EngineConfig.
Engine builder & runtime integration
crates/arkflow-core/src/engine_builder.rs, crates/arkflow-core/src/engine/mod.rs, crates/arkflow-core/src/cli/mod.rs
New EngineBuilder building streams with optional per-operator EnhancedStateManager; Engine::run now uses EngineBuilder.build_streams(); CLI run() now returns error when config missing.
Stream changes / Stateful pipeline
crates/arkflow-core/src/stream/mod.rs
Adds StatefulPipeline wrapper, Stream gains optional state_manager and stateful_pipeline; StreamConfig adds per-stream state and a build_with_state helper.
State module root & metadata
crates/arkflow-core/src/state/mod.rs
Adds Metadata and MetadataValue, embed/extract helpers, submodule declarations and public re-exports for the state subsystem.
Enhanced state manager & wrappers
crates/arkflow-core/src/state/enhanced.rs
New EnhancedStateManager, EnhancedStateConfig, checkpoint APIs, state ops, ExactlyOnceProcessor, TwoPhaseCommitOutput, transaction logging and public management methods.
Transaction / barrier system
crates/arkflow-core/src/state/transaction.rs
Adds TransactionContext, BarrierType, TransactionCoordinator, BarrierInjector, participant tracking and transaction lifecycle helpers.
In-memory helpers & examples
crates/arkflow-core/src/state/helper.rs, crates/arkflow-core/src/state/example.rs, crates/arkflow-core/src/state/simple.rs
StateHelper trait, SimpleMemoryState, StateConfig, example processors (CountingProcessor, StatefulExampleProcessor), TransactionalOutputWrapper, SimpleBarrierInjector, StatefulProcessor wrappers, and example usage.
S3 backend & checkpoint coordinator
crates/arkflow-core/src/state/s3_backend.rs
S3StateBackend and S3StateStore, S3StateBackendConfig, path helpers, checkpoint metadata types, S3CheckpointCoordinator, JSON persistence via object_store, listing/cleanup APIs.
Performance wrapper for S3
crates/arkflow-core/src/state/performance.rs
OptimizedS3Backend with batching, zstd compression, LRU TTL cache, async op pool, OptimizedStateStore, and PerformanceStats.
Monitoring / metrics
crates/arkflow-core/src/state/monitoring.rs
Prometheus StateMetrics, OperationTimer, StateMonitor, MonitoredStateManager, alert configuration and health reporting APIs; tests for metrics/timers.
Tests (unit & integration)
crates/arkflow-core/src/state/tests.rs, crates/arkflow-core/src/state/integration_tests.rs, crates/arkflow-core/tests/*.rs
Unit and integration tests covering metadata embed/extract, typed in-memory state, monitoring metrics export, EnhancedStateManager memory-backend flows, and end-to-end state-management integration tests.
Examples & configs
examples/*.rs, examples/*.yaml, examples/data/* (e.g., examples/session_window.rs, examples/stateful_pipeline.rs, examples/word_count.rs, examples/run_stateful_example.rs, examples/stateful_example.yaml, examples/stateful_s3_example.yaml, examples/data/test.txt)
New runnable examples and YAML configs demonstrating session windows, stateful pipelines, stateful/stateless word count, runtime example with EngineBuilder and state monitoring, and sample data.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Source
  participant Proc as Processor
  participant EO as ExactlyOnceProcessor
  participant ESM as EnhancedStateManager
  participant TC as TransactionCoordinator
  participant Out as Output (2PC)
  Note over Source,Proc: Incoming MessageBatch
  Source->>Proc: send(batch)
  Proc->>EO: process(batch)
  EO->>ESM: read/write operator state
  alt batch contains TransactionContext
    EO->>TC: register / prepare checkpoint
    TC-->>EO: transaction context
  end
  EO-->>Proc: processed batches
  Proc->>Out: write(batch)
  Out->>ESM: prepare (log state) / commit via 2PC
  Out-->>Proc: ack
Loading
sequenceDiagram
  autonumber
  participant ESM as EnhancedStateManager
  participant PB as OptimizedS3Backend / S3StateBackend
  participant OS as ObjectStore (S3)
  participant Mon as StateMonitor
  ESM->>Mon: record checkpoint start
  ESM->>PB: persist operator states (batched/compressed maybe)
  PB->>OS: PUT objects (checkpoint, states, metadata)
  OS-->>PB: OK
  PB-->>ESM: persistence ACK
  ESM->>Mon: record checkpoint success / metrics
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45–75 minutes

Possibly related PRs

Suggested labels

enhancement

Poem

I hop through crates and schema seams,
I plant small checkpoints in streaming dreams.
I nibble caches, compress a byte,
and guard each transaction through day and night.
A rabbit of state—metrics gleam, all right! 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/new

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbit in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbit in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbit gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbit read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbit help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbit ignore or @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbit summary or @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbit or @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 28

🧹 Nitpick comments (52)
README-STATE-MANAGEMENT.md (4)

46-53: Avoid overpromising “exactly-once” until Phase 2 is complete or clearly marked experimental

This section states hard guarantees, but docs/state-management-implementation-summary.md says Phase 2 (S3 + exactly-once) is only “partially complete.” Rephrase as “experimental/preview” or add a status note to avoid inconsistency across docs.


135-139: Pinning dependency version may be inaccurate for this repo; prefer workspace or verify crate version

If this README targets users of the published crate, verify the current version. If it targets contributors, prefer workspace path to avoid version drift.

-[dependencies]
-arkflow-core = "0.4"
+[dependencies]
+# If using this repository directly:
+# arkflow-core = { path = "crates/arkflow-core" }
+# If using from crates.io, replace with the current released version:
+# arkflow-core = "X.Y"

158-160: MessageBatch::from_string may not exist; align with new_binary used elsewhere

Examples in this PR use MessageBatch::new_binary. Consider updating for consistency.

-    let batch = MessageBatch::from_string("hello world")?;
+    let batch = MessageBatch::new_binary(vec![b"hello world".to_vec()])?;

193-197: Benchmark numbers need context

Add hardware, dataset, and methodology or mark as indicative to avoid misleading expectations.

CLAUDE.md (2)

81-99: Quote broker endpoints in YAML

Unquoted host:port can be parsed unexpectedly in YAML. Quote the strings.

-      brokers: [localhost:9092]
+      brokers: ["localhost:9092"]
...
-      topics: [test-topic]
+      topics: ["test-topic"]

108-113: Clarify that backpressure threshold is configurable

Docs state a fixed threshold (1024). If configurable, note the config key or remove the hardcoded value.

docs/state-management.md (4)

27-29: Use the canonical batch constructor

Align with other examples using binary batches.

-    let batch = MessageBatch::from_string("hello world")?;
+    let batch = MessageBatch::new_binary(vec![b"hello world".to_vec()])?;

44-50: TransactionalOutput example lacks commit/flush semantics

If your TransactionalOutputWrapper exposes explicit prepare/commit/abort or implicit commit on checkpoint, mention it to avoid misuse.


87-102: Add missing imports in snippet for completeness

The snippet uses Arc and HashMap without imports.

+use std::collections::HashMap;
+use std::sync::Arc;

165-175: Consistency: avoid importing unused StatefulExampleProcessor in WordCount example

The WordCount snippet defines its own processor; drop unrelated imports to reduce confusion.

docs/state-management-implementation-summary.md (2)

21-38: Doc-set consistency: mark “exactly-once” feature status uniformly

Here it’s “部分完成 ✅”, while README claims full “Exactly-Once Semantics.” Align wording across docs (e.g., “experimental/preview”) and briefly list known limitations.


156-156: Minor wording polish

Avoid mixing English “solid” in Chinese text. Suggest “扎实”.

-当前实现已经提供了一个solid的基础
+当前实现已经提供了一个扎实的基础
examples/stateful_pipeline.rs (5)

150-155: Avoid potential panic on system clock and reduce cast risk

duration_since can panic on clock skew. Also, document the u128→u64 cast.

-                    .unwrap_or_else(|| {
-                        std::time::SystemTime::now()
-                            .duration_since(std::time::UNIX_EPOCH)
-                            .unwrap()
-                            .as_millis() as u64
-                    });
+                    .unwrap_or_else(|| {
+                        use std::time::{SystemTime, UNIX_EPOCH, Duration};
+                        let dur = SystemTime::now()
+                            .duration_since(UNIX_EPOCH)
+                            .unwrap_or(Duration::from_millis(0));
+                        dur.as_millis().saturating_cast::<u64>()
+                    });

If saturating_cast isn’t available, use let ms = dur.as_millis(); (ms.min(u128::from(u64::MAX))) as u64.


329-329: Suppress unused variable or wire the stream

stream is created but not used. Either run it or rename to _stream to avoid warnings.

-    let (mut stream, state_manager) = build_stateful_pipeline(config).await?;
+    let (_stream, state_manager) = build_stateful_pipeline(config).await?;

167-193: Reduce lock contention: consolidate state writes per batch

You take a write lock and perform get/set per message. Consider batching updates in a local map and persisting once per window/key to reduce contention and I/O.


195-201: One output per input increases overhead

Emitting a result batch per message can be costly. Consider emitting per window or at checkpoint boundaries for throughput.


6-13: Tidy imports

OperationTimer, TwoPhaseCommitOutput, and possibly Input/Output/Uuid are unused in this example. Consider removing for clarity.

crates/arkflow-core/Cargo.toml (2)

31-38: Gate heavy deps behind optional features to reduce compile time and transitive bloat

Make object_store, prometheus, and async-compression optional, and disable unnecessary default features. This keeps core lightweight for users not using S3/metrics/compression.

-uuid = { version = "1.10", features = ["v4"] }
-object_store = { version = "0.12", features = ["aws"] }
+uuid = { version = "1.10", default-features = false, features = ["v4", "fast-rng"] }
+object_store = { version = "0.12", features = ["aws"], optional = true }
-futures-util = { workspace = true }
-async-compression = { version = "0.4", features = ["tokio", "zstd"] }
+futures-util = { workspace = true, default-features = false, features = ["alloc"] }
+async-compression = { version = "0.4", default-features = false, features = ["tokio", "zstd"], optional = true }
 lru = "0.12"
 bytes = "1.7"
-prometheus = "0.13"
+prometheus = { version = "0.13", default-features = false, optional = true }
-parking_lot = "0.12"
+parking_lot = { version = "0.12", default-features = false }

Add features section (outside this hunk):

[features]
default = []
s3-backend = ["dep:object_store", "dep:async-compression"]
metrics = ["dep:prometheus"]

30-30: Avoid exact pin on num_cpus unless needed

Exact pin can cause unnecessary resolver conflicts. Prefer caret or workspace-managed.

-num_cpus = "1.17.0"
+num_cpus = "1.17"
crates/arkflow-core/src/state/integration_tests.rs (3)

26-34: Lower checkpoint interval to speed CI without changing semantics

1s stalls tests under load; 10–50ms is plenty for unit paths.

-            checkpoint_interval_ms: 1000,
+            checkpoint_interval_ms: 25,

39-47: Remove redundant reference on string literal

Passing &"counter" produces an extra reference; pass &str directly.

-            .set_state_value("test_op", &"counter", 42u64)
+            .set_state_value("test_op", "counter", 42u64)

49-57: Assert checkpoint ID monotonicity for stronger signal

Also validate that IDs are positive and increasing across two invocations.

-        let checkpoint_id = state_manager.create_checkpoint().await.unwrap();
-        assert!(checkpoint_id > 0);
+        let c1 = state_manager.create_checkpoint().await.unwrap();
+        let c2 = state_manager.create_checkpoint().await.unwrap();
+        assert!(c1 > 0 && c2 > c1);
crates/arkflow-core/src/state/tests.rs (2)

36-39: Strengthen metadata round-trip assertion

Assert content equality, not only presence. If PartialEq is impl’d, compare; otherwise, check expected fields.

-        let extracted_metadata = batch_with_metadata.metadata();
-        assert!(extracted_metadata.is_some());
+        let extracted = batch_with_metadata.metadata().expect("metadata missing");
+        // Example if PartialEq:
+        // assert_eq!(extracted, metadata);
+        // Or minimally ensure defaults:
+        assert!(extracted.transaction.is_none());

65-88: Metrics assertions are fine; avoid brittle substring if possible

If you export a registry, consider asserting presence of a known metric name rather than generic "arkflow_state".

crates/arkflow-core/src/lib.rs (2)

79-81: Consider feature-gating the new ObjectStore error variant

If S3/object store is optional, gate this variant to avoid forcing the dep for users not using state backends.

-    #[error("Object store error: {0}")]
-    ObjectStore(String),
+    #[cfg(feature = "s3-backend")]
+    #[error("Object store error: {0}")]
+    ObjectStore(String),

161-171: Gate external error conversions and keep mapping consistent

Avoid unconditional deps on object_store/prometheus; map Prometheus errors to a dedicated variant or keep as Process but gate it.

-impl From<object_store::Error> for Error {
+#[cfg(feature = "s3-backend")]
+impl From<object_store::Error> for Error {
     fn from(err: object_store::Error) -> Self {
         Error::ObjectStore(format!("{}", err))
     }
 }
 
-impl From<prometheus::Error> for Error {
+#[cfg(feature = "metrics")]
+impl From<prometheus::Error> for Error {
     fn from(err: prometheus::Error) -> Self {
         Error::Process(format!("Prometheus error: {}", err))
     }
 }
docs/state-management-guide.md (4)

194-203: Align logging guidance with crate’s tracing stack

Project uses tracing/tracing-subscriber; prefer that over env_logger to avoid mixed patterns.

- env_logger::Builder::from_default_env()
-     .filter_level(log::LevelFilter::Debug)
-     .init();
+ tracing_subscriber::fmt()
+     .with_max_level(tracing::Level::DEBUG)
+     .with_target(false)
+     .init();

37-50: Doc snippet stores a custom struct; note the Serialize/Deserialize bound

If SessionInfo lacks serde derives, example won’t compile. Add a brief note.


13-24: Minor style nits in bulleted lists

LanguageTool flags are likely due to inconsistent punctuation after bold terms. Consider adding colons/periods consistently.


136-148: Align S3 configuration documentation with the actual S3StateBackendConfig

The example YAML in docs/state-management-guide.md (lines 136–148) currently shows only four S3 settings (bucket, region, prefix, use_ssl), but the Rust struct includes three additional optional fields. Update the docs so that users see every configurable key and its default behavior to avoid runtime errors.

Locations to update:

  • docs/state-management-guide.md (around lines 136–148)

Missing fields to document:

  • endpoint (Option)
  • access_key_id (Option)
  • secret_access_key (Option)

Suggested diff for the YAML snippet:

 state:
   enabled: true
   backend_type: S3
   checkpoint_interval_ms: 60000
   retained_checkpoints: 10
   exactly_once: true
   s3_config:
     bucket: "my-app-state"
     region: "us-east-1"
     prefix: "prod/checkpoints"
+    endpoint: "https://s3.custom-endpoint.com"  # optional
+    access_key_id: "YOUR_ACCESS_KEY"            # optional
+    secret_access_key: "YOUR_SECRET_KEY"        # optional
     use_ssl: true
crates/arkflow-core/src/state/helper.rs (1)

98-101: Prefer iterator return to avoid lifetime coupling from Vec<&str>.

Returning Vec<&str> ties callers to Self’s lifetime. Expose an iterator instead (zero alloc) or return owned keys if you prefer simplicity.

-    pub fn keys(&self) -> Vec<&str> {
-        self.data.keys().map(|s| s.as_str()).collect()
-    }
+    pub fn keys(&self) -> impl Iterator<Item = &str> {
+        self.data.keys().map(|s| s.as_str())
+    }
examples/session_window.rs (3)

171-193: Keep active session count consistent with the tracked list.

It’s never incremented on new sessions and only decremented on cleanup. Set it from the list length after updates.

                 if !active_sessions.contains(&event.session_id) {
                     active_sessions.push(event.session_id.clone());
                     state_manager
                         .set_state_value(&self.operator_id, &"active_session_list", active_sessions)
                         .await?;
+                    // Keep a simple derived counter in sync
+                    let list: Vec<String> = state_manager
+                        .get_state_value(&self.operator_id, "active_session_list")
+                        .await?
+                        .unwrap_or_default();
+                    state_manager
+                        .set_state_value(&self.operator_id, "active_sessions", list.len())
+                        .await?;
                 }

And in cleanup, prefer deriving from the list instead of subtracting:

-        let count = state_manager
-            .get_state_value::<usize>(&self.operator_id, &"active_sessions")
-            .await?
-            .unwrap_or(0)
-            .saturating_sub(expired.len());
-
-        state_manager
-            .set_state_value(&self.operator_id, &"active_sessions", count)
-            .await?;
+        let list: Vec<String> = state_manager
+            .get_state_value(&self.operator_id, "active_session_list")
+            .await?
+            .unwrap_or_default();
+        state_manager
+            .set_state_value(&self.operator_id, "active_sessions", list.len())
+            .await?;

Also applies to: 243-253


201-206: Remove unused variable.

current_time is computed but not used.

-        let current_time = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .unwrap()
-            .as_millis() as u64;

9-14: Clean up unused imports.

VecDeque and Instant aren’t used.

-use std::collections::{HashMap, VecDeque};
+use std::collections::HashMap;
@@
-use tokio::time::{sleep, Instant};
+use tokio::time::sleep;
examples/word_count.rs (1)

31-41: Make counts map type explicit to u64 for clarity.

Avoid defaulting to i32.

-                let mut word_counts = HashMap::new();
+                let mut word_counts: HashMap<String, u64> = HashMap::new();
crates/arkflow-core/src/state/mod.rs (1)

70-77: Deduplicate magic string key for metadata.

Use a constant to avoid typos across extract/embed.

+const METADATA_FIELD: &str = "__arkflow_metadata__";
@@
-            .get("__arkflow_metadata__")
+            .get(METADATA_FIELD)
@@
-        metadata.insert("__arkflow_metadata__".to_string(), metadata_json);
+        metadata.insert(METADATA_FIELD.to_string(), metadata_json);

Also applies to: 84-89

crates/arkflow-core/src/state/example.rs (4)

51-61: Count even when input_name is absent.

Default to a placeholder to make the example produce a visible count.

-        if let Some(input_name) = batch.get_input_name() {
-            let key = format!("count_{}", input_name);
+        {
+            let input_name = batch.get_input_name().unwrap_or("unknown");
+            let key = format!("count_{}", input_name);

74-82: Unused parameter and unconstrained generics in example.

Prefix key_column with underscore to silence warnings. Consider removing K,V generics since they’re not used.

-        key_column: &str,
+        _key_column: &str,

170-187: Avoid #[tokio::main] inside library code.

Define example_usage as async fn and call it from a proper example/bin or a test.

-#[tokio::main]
-async fn example_usage() -> Result<(), Error> {
+pub async fn example_usage() -> Result<(), Error> {

129-138: Consolidate StateBackendType into a single definition

The StateBackendType enum is currently defined twice under crates/arkflow-core/src/state:

  • enhanced.rs (around line 93) – this should be the canonical, crate-wide definition
  • example.rs (lines 129–138) – a duplicate that risks drifting out of sync

To avoid maintenance headaches and ensure a single source of truth, remove the local enum in example.rs and import the shared definition instead.

• File: crates/arkflow-core/src/state/example.rs
– Remove lines 129–138 (the duplicate pub enum StateBackendType { … })
– Add at the top of the file:

use crate::state::StateBackendType;

Example diff:

--- a/crates/arkflow-core/src/state/example.rs
+++ b/crates/arkflow-core/src/state/example.rs
@@ -1,6 +1,4 @@
-/// State backend types
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
-pub enum StateBackendType {
-    Memory,
-    FileSystem,
-    S3,
-}
+use crate::state::StateBackendType;
crates/arkflow-core/src/state/simple.rs (1)

144-148: Consider using Mutex instead of RwLock for write-heavy operations

The last_injection field is only ever written to (via write().await) and read once. Since there's no concurrent reading pattern that would benefit from RwLock, using Mutex would be more efficient.

 pub struct SimpleBarrierInjector {
     interval: std::time::Duration,
-    last_injection: Arc<tokio::sync::RwLock<std::time::Instant>>,
+    last_injection: Arc<tokio::sync::Mutex<std::time::Instant>>,
     next_checkpoint_id: Arc<AtomicU64>,
 }
crates/arkflow-core/src/state/performance.rs (2)

169-172: Unnecessary complexity with NonZeroUsize fallback

The unwrap_or pattern here is redundant since you're already providing a fallback value. The configuration should ensure valid values upfront.

-let local_cache = Arc::new(RwLock::new(LruCache::new(
-    NonZeroUsize::new(perf_config.local_cache_size)
-        .unwrap_or(NonZeroUsize::new(1000).unwrap()),
-)));
+let cache_size = NonZeroUsize::new(perf_config.local_cache_size)
+    .unwrap_or_else(|| NonZeroUsize::new(1000).unwrap());
+let local_cache = Arc::new(RwLock::new(LruCache::new(cache_size)));

413-414: TODO comments indicate incomplete implementation

The performance statistics tracking for cache hits and misses is not implemented, which could make it difficult to monitor cache effectiveness.

The TODO comments indicate that cache hit/miss tracking is missing. Would you like me to generate an implementation that properly tracks these metrics or create an issue to track this task?

crates/arkflow-core/src/state/s3_backend.rs (1)

129-149: Inefficient checkpoint ID extraction from filenames

The current implementation parses checkpoint IDs from filenames using string manipulation and sorting. This could be optimized and made more robust.

Consider using a more efficient approach with better error handling:

 async fn list_checkpoints(&self) -> Result<Vec<u64>, Error> {
-    let mut checkpoints = Vec::new();
+    let mut checkpoints = Vec::with_capacity(100); // Pre-allocate for typical case
 
     // List objects in checkpoint directory
     let mut stream = self.client.list(Some(&self.checkpoint_base_path));
 
     while let Some(object) = stream.try_next().await? {
         // Extract checkpoint ID from path
         if let Some(name) = object.location.filename() {
-            if let Some(rest) = name.strip_prefix("chk-") {
-                if let Ok(id) = rest.parse::<u64>() {
-                    checkpoints.push(id);
-                }
-            }
+            if let Some(id_str) = name.strip_prefix("chk-") {
+                // More robust parsing that handles the zero-padding
+                match id_str.trim_start_matches('0').parse::<u64>() {
+                    Ok(id) if id > 0 => checkpoints.push(id),
+                    Ok(0) if id_str == "0" || id_str.chars().all(|c| c == '0') => checkpoints.push(0),
+                    _ => {} // Skip invalid entries
+                }
+            }
         }
     }
crates/arkflow-core/src/state/enhanced.rs (3)

344-355: Redundant state manager locking in processing loop

The state manager is locked twice in quick succession (lines 338 and 351), which could cause unnecessary contention.

Consider keeping the lock for the entire operation:

 pub async fn process(&self, batch: MessageBatch) -> Result<Vec<MessageBatch>, Error>
 where
     P: crate::processor::Processor,
 {
     // Let state manager handle barriers and transactions
     let mut state_manager = self.state_manager.write().await;
     let processed_batches = state_manager.process_batch(batch).await?;
 
     // Apply actual processing
     let mut results = Vec::new();
     for batch in processed_batches {
         // Process with inner processor
         let inner_results = self.inner.process(batch.clone()).await?;
 
         // Update state if needed
         if let Some(tx_ctx) = batch.transaction_context() {
             // Example: Update processed count
             let state_key = format!("processed_count_{}", tx_ctx.checkpoint_id);
-            let mut state_manager = self.state_manager.write().await;
             state_manager
                 .set_state_value(&self.operator_id, &state_key, batch.len())
                 .await?;
         }
 
         results.extend(inner_results);
     }
 
     Ok(results)
 }

391-392: Unused field in TwoPhaseCommitOutput

The pending_transactions field is declared but never used in any of the methods.

Either implement pending transaction tracking or remove the unused field:

 pub struct TwoPhaseCommitOutput<O> {
     inner: O,
     state_manager: Arc<tokio::sync::RwLock<EnhancedStateManager>>,
     transaction_log: Arc<RwLock<Vec<TransactionLogEntry>>>,
-    pending_transactions: HashMap<String, Vec<MessageBatch>>,
 }

183-209: Incomplete checkpoint coordinator implementation

The comment block (lines 188-192) indicates this is a simplified implementation. The actual multi-operator coordination logic is missing.

The implementation currently only saves local states and doesn't handle multi-operator coordination as indicated by the comments. Would you like me to help implement proper checkpoint coordination with participant registration and two-phase commit protocol?

crates/arkflow-core/src/state/monitoring.rs (5)

13-13: Import Encoder trait if switching to encode().

Apply this diff:

-use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts, Registry, TextEncoder};
+use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts, Registry, TextEncoder, Encoder};

455-464: Compute error_rate instead of placeholder.

Provides actionable health output with existing counters.

Apply this diff:

     pub fn health_status(&self) -> HealthStatus {
-        // This would typically query current metrics and calculate health
-        HealthStatus {
-            healthy: true, // Simplified for now
-            state_size: self.metrics.state_size_bytes.get() as u64,
-            active_transactions: self.metrics.active_transactions.get() as usize,
-            last_checkpoint: None, // Would need to track this
-            error_rate: 0.0,       // Would need to calculate from counters
-        }
+        let total = self.metrics.operations_total.get();
+        let failed = self.metrics.operations_failed_total.get();
+        let error_rate = if total > 0.0 { failed / total } else { 0.0 };
+        HealthStatus {
+            healthy: error_rate <= self.alert_config.error_rate_threshold
+                && (self.metrics.state_size_bytes.get() as u64) <= self.alert_config.state_size_threshold,
+            state_size: self.metrics.state_size_bytes.get() as u64,
+            active_transactions: self.metrics.active_transactions.get() as usize,
+            last_checkpoint: None,
+            error_rate,
+        }

444-452: Cache metrics API is split (record_cache_hit/miss vs update_cache_metrics). Consider consolidating.

To avoid double-counting across paths, expose one method (incremental or absolute) and document semantics.

Would you like a small wrapper that accepts an enum {Hit, Miss} and size Option to unify updates?


478-483: backend_type is written but unused.

Either use for conditional metrics or remove to avoid dead code warnings.

Apply this diff to silence temporarily if you plan to use it soon:

 pub struct MonitoredStateManager {
     inner: EnhancedStateManager,
     monitor: Arc<StateMonitor>,
-    backend_type: StateBackendType,
+    #[allow(dead_code)]
+    backend_type: StateBackendType,
 }

389-396: Export path: consider Registry exposure vs. one global registry.

If multiple monitors exist, you may prefer sharing a global registry to avoid duplicate metric names in different registries.

If helpful, I can draft StateMonitor::with_registry(registry: Registry, alert_config: AlertConfig) and make StateMetrics::new_with_registry public to the crate.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between dc93990 and 1e7276e.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (21)
  • CLAUDE.md (1 hunks)
  • README-STATE-MANAGEMENT.md (1 hunks)
  • crates/arkflow-core/Cargo.toml (1 hunks)
  • crates/arkflow-core/src/lib.rs (4 hunks)
  • crates/arkflow-core/src/state/enhanced.rs (1 hunks)
  • crates/arkflow-core/src/state/example.rs (1 hunks)
  • crates/arkflow-core/src/state/helper.rs (1 hunks)
  • crates/arkflow-core/src/state/integration_tests.rs (1 hunks)
  • crates/arkflow-core/src/state/mod.rs (1 hunks)
  • crates/arkflow-core/src/state/monitoring.rs (1 hunks)
  • crates/arkflow-core/src/state/performance.rs (1 hunks)
  • crates/arkflow-core/src/state/s3_backend.rs (1 hunks)
  • crates/arkflow-core/src/state/simple.rs (1 hunks)
  • crates/arkflow-core/src/state/tests.rs (1 hunks)
  • crates/arkflow-core/src/state/transaction.rs (1 hunks)
  • docs/state-management-guide.md (1 hunks)
  • docs/state-management-implementation-summary.md (1 hunks)
  • docs/state-management.md (1 hunks)
  • examples/session_window.rs (1 hunks)
  • examples/stateful_pipeline.rs (1 hunks)
  • examples/word_count.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (15)
crates/arkflow-core/src/lib.rs (2)
crates/arkflow-plugin/src/input/file.rs (1)
  • object_store (246-254)
crates/arkflow-core/src/state/mod.rs (2)
  • from (101-106)
  • extract_from_batch (71-78)
crates/arkflow-core/src/state/simple.rs (3)
crates/arkflow-core/src/state/enhanced.rs (7)
  • new (101-128)
  • new (320-330)
  • new (396-403)
  • process (333-361)
  • write (406-427)
  • get_transaction_log (473-475)
  • default (78-88)
crates/arkflow-core/src/lib.rs (6)
  • from (162-164)
  • from (168-170)
  • from (258-263)
  • from (267-269)
  • metadata (332-334)
  • from_string (213-215)
crates/arkflow-core/src/state/transaction.rs (1)
  • aligned_checkpoint (68-70)
crates/arkflow-core/src/state/transaction.rs (3)
crates/arkflow-core/src/state/enhanced.rs (3)
  • new (101-128)
  • new (320-330)
  • new (396-403)
crates/arkflow-core/src/state/mod.rs (2)
  • new (63-68)
  • extract_from_batch (71-78)
crates/arkflow-core/src/lib.rs (1)
  • metadata (332-334)
crates/arkflow-core/src/state/mod.rs (1)
crates/arkflow-core/src/lib.rs (6)
  • metadata (332-334)
  • new_arrow (205-210)
  • from (162-164)
  • from (168-170)
  • from (258-263)
  • from (267-269)
examples/stateful_pipeline.rs (2)
crates/arkflow-core/src/state/enhanced.rs (4)
  • new (101-128)
  • new (320-330)
  • new (396-403)
  • process (333-361)
crates/arkflow-core/src/state/monitoring.rs (4)
  • new (57-61)
  • new (358-366)
  • new (486-498)
  • monitor (506-508)
crates/arkflow-core/src/state/helper.rs (3)
crates/arkflow-core/src/state/performance.rs (5)
  • get_typed (502-507)
  • put_typed (509-514)
  • new (119-124)
  • new (157-182)
  • new (447-454)
crates/arkflow-core/src/state/s3_backend.rs (5)
  • get_typed (340-345)
  • put_typed (347-352)
  • new (70-107)
  • new (293-301)
  • new (373-379)
crates/arkflow-core/src/state/example.rs (2)
  • new (34-39)
  • new (150-157)
examples/word_count.rs (4)
crates/arkflow-core/src/state/enhanced.rs (5)
  • process (333-361)
  • new (101-128)
  • new (320-330)
  • new (396-403)
  • default (78-88)
crates/arkflow-core/src/state/simple.rs (2)
  • process (44-74)
  • default (200-206)
examples/session_window.rs (7)
  • process (200-271)
  • state_manager (127-128)
  • state_manager (132-133)
  • state_manager (184-185)
  • state_manager (216-217)
  • None (166-166)
  • main (328-420)
crates/arkflow-core/src/lib.rs (2)
  • new_binary (99-101)
  • from_string (213-215)
crates/arkflow-core/src/state/monitoring.rs (1)
crates/arkflow-core/src/state/enhanced.rs (3)
  • new (101-128)
  • new (320-330)
  • new (396-403)
examples/session_window.rs (3)
crates/arkflow-core/src/state/enhanced.rs (5)
  • new (101-128)
  • new (320-330)
  • new (396-403)
  • process (333-361)
  • default (78-88)
examples/stateful_pipeline.rs (3)
  • state_manager (123-124)
  • state_manager (172-173)
  • process (136-204)
crates/arkflow-core/src/lib.rs (2)
  • new_binary (99-101)
  • len (221-223)
crates/arkflow-core/src/state/s3_backend.rs (2)
crates/arkflow-core/src/state/enhanced.rs (3)
  • new (101-128)
  • new (320-330)
  • new (396-403)
crates/arkflow-core/src/state/helper.rs (5)
  • new (41-45)
  • get_typed (23-25)
  • get_typed (59-70)
  • put_typed (28-30)
  • put_typed (72-79)
crates/arkflow-core/src/state/performance.rs (3)
crates/arkflow-core/src/state/helper.rs (5)
  • new (41-45)
  • get_typed (23-25)
  • get_typed (59-70)
  • put_typed (28-30)
  • put_typed (72-79)
crates/arkflow-core/src/state/s3_backend.rs (7)
  • new (70-107)
  • new (293-301)
  • new (373-379)
  • local_state (304-306)
  • local_state_mut (309-311)
  • get_typed (340-345)
  • put_typed (347-352)
crates/arkflow-core/src/lib.rs (4)
  • from (162-164)
  • from (168-170)
  • from (258-263)
  • from (267-269)
crates/arkflow-core/src/state/example.rs (3)
crates/arkflow-core/src/state/helper.rs (2)
  • new (41-45)
  • default (110-112)
crates/arkflow-core/src/state/simple.rs (5)
  • new (36-41)
  • new (97-102)
  • get_count (77-81)
  • default (200-206)
  • example_usage (210-244)
crates/arkflow-core/src/lib.rs (1)
  • from_string (213-215)
crates/arkflow-core/src/state/tests.rs (4)
crates/arkflow-core/src/lib.rs (3)
  • from_string (213-215)
  • metadata (332-334)
  • is_empty (217-219)
crates/arkflow-core/src/state/helper.rs (1)
  • new (41-45)
crates/arkflow-core/src/state/mod.rs (1)
  • new (63-68)
crates/arkflow-core/src/state/monitoring.rs (4)
  • new (57-61)
  • new (358-366)
  • new (486-498)
  • monitor (506-508)
crates/arkflow-core/src/state/enhanced.rs (4)
crates/arkflow-core/src/state/helper.rs (1)
  • new (41-45)
crates/arkflow-core/src/state/monitoring.rs (4)
  • new (57-61)
  • new (358-366)
  • new (486-498)
  • inner (501-503)
crates/arkflow-core/src/state/s3_backend.rs (3)
  • new (70-107)
  • new (293-301)
  • new (373-379)
crates/arkflow-core/src/state/simple.rs (5)
  • new (36-41)
  • new (97-102)
  • new (152-158)
  • process (44-74)
  • write (105-135)
crates/arkflow-core/src/state/integration_tests.rs (2)
crates/arkflow-core/src/state/enhanced.rs (3)
  • new (101-128)
  • new (320-330)
  • new (396-403)
crates/arkflow-core/src/state/helper.rs (1)
  • new (41-45)
🪛 LanguageTool
docs/state-management.md

[grammar] ~7-~7: There might be a mistake here.
Context: ...# Overview The implementation provides: 1. Metadata support in MessageBatch - Att...

(QB_NEW_EN)


[grammar] ~154-~154: There might be a mistake here.
Context: ...ntation** - Add persistent state storage 2. Exactly-Once Guarantees - Full two-pha...

(QB_NEW_EN)


[grammar] ~155-~155: There might be a mistake here.
Context: ...* - Full two-phase commit implementation 3. State Partitioning - Scale state with ...

(QB_NEW_EN)


[grammar] ~156-~156: There might be a mistake here.
Context: ...ng** - Scale state with key partitioning 4. State TTL - Automatic cleanup of old s...

(QB_NEW_EN)


[grammar] ~157-~157: There might be a mistake here.
Context: ...e TTL** - Automatic cleanup of old state 5. Monitoring - Metrics for state size an...

(QB_NEW_EN)

CLAUDE.md

[grammar] ~49-~49: There might be a mistake here.
Context: ...w-core/): Core stream processing engine - lib.rs`: Main types (MessageBatch, Error, Resou...

(QB_NEW_EN)


[grammar] ~50-~50: There might be a mistake here.
Context: ...in types (MessageBatch, Error, Resource) - stream/mod.rs: Stream orchestration with input/pipeli...

(QB_NEW_EN)


[grammar] ~51-~51: There might be a mistake here.
Context: ...orchestration with input/pipeline/output - config.rs: Configuration management (YAML/JSON/TO...

(QB_NEW_EN)


[grammar] ~52-~52: There might be a mistake here.
Context: ...onfiguration management (YAML/JSON/TOML) - input/, output/, processor/, buffer/: Co...

(QB_NEW_EN)


[grammar] ~55-~55: There might be a mistake here.
Context: ...rkflow-plugin/): Plugin implementations - input/`: Kafka, MQTT, HTTP, file, database, etc...

(QB_NEW_EN)


[grammar] ~56-~56: There might be a mistake here.
Context: ... Kafka, MQTT, HTTP, file, database, etc. - output/: Kafka, MQTT, HTTP, stdout, etc. - `...

(QB_NEW_EN)


[grammar] ~57-~57: There might be a mistake here.
Context: ...utput/: Kafka, MQTT, HTTP, stdout, etc. - processor/`: SQL, JSON, Protobuf, Python, VRL, etc....

(QB_NEW_EN)


[grammar] ~58-~58: There might be a mistake here.
Context: ...: SQL, JSON, Protobuf, Python, VRL, etc. - buffer/: Memory, session/sliding/tumbling windo...

(QB_NEW_EN)


[grammar] ~72-~72: There might be a mistake here.
Context: ...ata structure wrapping Arrow RecordBatch - Stream: Orchestrates components with b...

(QB_NEW_EN)


[grammar] ~73-~73: There might be a mistake here.
Context: ...es components with backpressure handling - Pipeline: Chain of processors for data...

(QB_NEW_EN)


[grammar] ~74-~74: There might be a mistake here.
Context: ...in of processors for data transformation - Buffer: Optional buffering with window...

(QB_NEW_EN)


[grammar] ~103-~103: There might be a mistake here.
Context: ...c ``` ## Key Concepts ### MessageBatch - Wraps Arrow RecordBatch for columnar pro...

(QB_NEW_EN)


[grammar] ~108-~108: There might be a mistake here.
Context: ...-stream scenarios ### Stream Processing - Async processing with Tokio runtime - Ba...

(QB_NEW_EN)


[grammar] ~109-~109: There might be a mistake here.
Context: ...ng - Async processing with Tokio runtime - Backpressure control (threshold: 1024 me...

(QB_NEW_EN)


[grammar] ~110-~110: There might be a mistake here.
Context: ...ssure control (threshold: 1024 messages) - Ordered delivery with sequence numbers -...

(QB_NEW_EN)


[grammar] ~111-~111: There might be a mistake here.
Context: ...- Ordered delivery with sequence numbers - Graceful shutdown with cancellation toke...

(QB_NEW_EN)


[grammar] ~114-~114: There might be a mistake here.
Context: ...ancellation tokens ### Component Traits All components implement async traits: -...

(QB_NEW_EN)


[grammar] ~115-~115: There might be a mistake here.
Context: ...s All components implement async traits: - Input: read(), connect(), close() - `Ou...

(QB_NEW_EN)


[grammar] ~116-~116: There might be a mistake here.
Context: ...aits: - Input: read(), connect(), close() - Output: write(), connect(), close() - `P...

(QB_NEW_EN)


[grammar] ~117-~117: There might be a mistake here.
Context: ...()-Output: write(), connect(), close()-Processor: process()Vec-B...

(QB_NEW_EN)


[grammar] ~118-~118: There might be a mistake here.
Context: ... close() - Processor: process()Vec<MessageBatch> - Buffer: read(), write(), flush() ## Dev...

(QB_NEW_EN)


[grammar] ~123-~123: There might be a mistake here.
Context: ...nt Guidelines ### Adding New Components 1. Implement component trait in appropriate...

(QB_NEW_EN)


[grammar] ~134-~134: There might be a mistake here.
Context: ...:EOFfor graceful shutdown ### Testing - Unit tests intests/` directories - Int...

(QB_NEW_EN)


[grammar] ~135-~135: There might be a mistake here.
Context: ...ing - Unit tests in tests/ directories - Integration tests with real components -...

(QB_NEW_EN)


[grammar] ~136-~136: There might be a mistake here.
Context: ...- Integration tests with real components - Use mockall for mocking dependencies ##...

(QB_NEW_EN)

docs/state-management-guide.md

[grammar] ~13-~13: There might be a mistake here.
Context: ...mory storage for development and testing - S3 State: Persistent, distributed stor...

(QB_NEW_EN)


[grammar] ~14-~14: There might be a mistake here.
Context: ...ributed storage for production workloads - Hybrid: Combines memory for speed with...

(QB_NEW_EN)


[grammar] ~21-~21: There might be a mistake here.
Context: ...c**: Triggered at configurable intervals - Barrier-based: Aligned across all oper...

(QB_NEW_EN)


[grammar] ~22-~22: There might be a mistake here.
Context: ...ross all operators using special markers - Incremental: Only changes since the la...

(QB_NEW_EN)

README-STATE-MANAGEMENT.md

[grammar] ~7-~7: There might be a mistake here.
Context: ...rns. ## Table of Contents 1. Overview 2. Core Concepts 3. [Featu...

(QB_NEW_EN)


[grammar] ~8-~8: There might be a mistake here.
Context: ... Overview 2. Core Concepts 3. Features 4. [Getting Started...

(QB_NEW_EN)


[grammar] ~9-~9: There might be a mistake here.
Context: ...e Concepts](#core-concepts) 3. Features 4. Getting Started 5. [E...

(QB_NEW_EN)


[grammar] ~10-~10: There might be a mistake here.
Context: ...Features](#features) 4. Getting Started 5. Examples 6. [Performance](#p...

(QB_NEW_EN)


[grammar] ~11-~11: There might be a mistake here.
Context: ... Started](#getting-started) 5. Examples 6. Performance 7. [Monitorin...

(QB_NEW_EN)


[grammar] ~12-~12: There might be a mistake here.
Context: ...5. Examples 6. Performance 7. Monitoring 8. [Configurati...

(QB_NEW_EN)


[grammar] ~13-~13: There might be a mistake here.
Context: ...erformance](#performance) 7. Monitoring 8. Configuration 9. [Best ...

(QB_NEW_EN)


[grammar] ~14-~14: There might be a mistake here.
Context: ...nitoring](#monitoring) 8. Configuration 9. Best Practices ## Ove...

(QB_NEW_EN)


[grammar] ~21-~21: There might be a mistake here.
Context: ...Maintain state across message processing - Exactly-Once Semantics: Ensure each me...

(QB_NEW_EN)


[grammar] ~22-~22: There might be a mistake here.
Context: ...e each message is processed exactly once - Fault Tolerance: Automatic recovery fr...

(QB_NEW_EN)


[grammar] ~23-~23: There might be a mistake here.
Context: ...recovery from failures using checkpoints - Multiple Backends: Support for in-memo...

(QB_NEW_EN)


[grammar] ~24-~24: There might be a mistake here.
Context: ...for in-memory and S3-based state storage - Performance Optimizations: Batching, c...

(QB_NEW_EN)


[grammar] ~25-~25: There might be a mistake here.
Context: ...ns**: Batching, compression, and caching - Comprehensive Monitoring: Metrics and ...

(QB_NEW_EN)


[grammar] ~34-~34: There might be a mistake here.
Context: ...mory storage for development and testing - S3: Persistent, distributed storage fo...

(QB_NEW_EN)


[grammar] ~35-~35: There might be a mistake here.
Context: ...ributed storage for production workloads - Hybrid: Combines memory speed with S3 ...

(QB_NEW_EN)


[grammar] ~42-~42: There might be a mistake here.
Context: ...odic snapshots at configurable intervals - Aligned: Using barrier mechanisms for ...

(QB_NEW_EN)


[grammar] ~43-~43: There might be a mistake here.
Context: ...Using barrier mechanisms for consistency - Incremental: Only changes are saved to...

(QB_NEW_EN)


[grammar] ~193-~193: There might be a mistake here.
Context: ...Memory Backend: >100K operations/sec - S3 Backend: 1K-10K operations/sec (dep...

(QB_NEW_EN)


[grammar] ~194-~194: There might be a mistake here.
Context: ...K operations/sec (depending on batching) - Checkpoint Overhead: <100ms for 1MB st...

(QB_NEW_EN)


[grammar] ~195-~195: There might be a mistake here.
Context: ...ckpoint Overhead**: <100ms for 1MB state - Recovery Time: Proportional to state s...

(QB_NEW_EN)


[grammar] ~204-~204: There might be a mistake here.
Context: ...metrics: - Operation counts and latency - State size and growth - Checkpoint durat...

(QB_NEW_EN)


[grammar] ~205-~205: There might be a mistake here.
Context: ...unts and latency - State size and growth - Checkpoint duration and success rate - C...

(QB_NEW_EN)


[grammar] ~206-~206: There might be a mistake here.
Context: ...h - Checkpoint duration and success rate - Cache hit/miss ratios - Error rates ###...

(QB_NEW_EN)


[grammar] ~207-~207: There might be a mistake here.
Context: ...and success rate - Cache hit/miss ratios - Error rates ### Health Checks ```rust ...

(QB_NEW_EN)


[grammar] ~307-~307: There might be a mistake here.
Context: ...emoryState: Basic in-memory state store - EnhancedStateManager: Advanced state management - ExactlyOn...

(QB_NEW_EN)


[grammar] ~308-~308: There might be a mistake here.
Context: ...StateManager: Advanced state management - ExactlyOnceProcessor: Wrapper for exactly-once semantics - ...

(QB_NEW_EN)


[grammar] ~309-~309: There might be a mistake here.
Context: ...sor: Wrapper for exactly-once semantics - TwoPhaseCommitOutput`: Transactional output wrapper ### Conf...

(QB_NEW_EN)


[grammar] ~314-~314: There might be a mistake here.
Context: ...tateConfig: State manager configuration - S3StateBackendConfig: S3 backend configuration - Performanc...

(QB_NEW_EN)


[grammar] ~315-~315: There might be a mistake here.
Context: ...BackendConfig: S3 backend configuration - PerformanceConfig`: Performance optimization settings ###...

(QB_NEW_EN)


[grammar] ~320-~320: There might be a mistake here.
Context: ...ng - StateMonitor: Metrics collection - StateMetrics: Prometheus metrics - HealthStatus: S...

(QB_NEW_EN)


[grammar] ~321-~321: There might be a mistake here.
Context: ...ion - StateMetrics: Prometheus metrics - HealthStatus: System health information ## Troubles...

(QB_NEW_EN)


[grammar] ~328-~328: There might be a mistake here.
Context: ... Common Issues 1. Checkpoint Failures - Check S3 permissions - Verify network...

(QB_NEW_EN)


[grammar] ~333-~333: There might be a mistake here.
Context: ... available disk space 2. High Latency - Enable batching - Increase cache size...

(QB_NEW_EN)


[grammar] ~338-~338: There might be a mistake here.
Context: ...eck S3 performance 3. Memory Pressure - Use S3 backend - Reduce checkpoint fr...

(QB_NEW_EN)


[grammar] ~343-~343: There might be a mistake here.
Context: ...ate partitioning 4. Recovery Failures - Verify checkpoint integrity - Check b...

(QB_NEW_EN)


[grammar] ~362-~362: There might be a mistake here.
Context: ...ing**: Automatic sharding of large state 2. Incremental Checkpoints: Only save cha...

(QB_NEW_EN)


[grammar] ~363-~363: There might be a mistake here.
Context: ...emental Checkpoints**: Only save changes 3. Async State Backends: Non-blocking sta...

(QB_NEW_EN)


[grammar] ~364-~364: There might be a mistake here.
Context: ...ackends**: Non-blocking state operations 4. State Schema Evolution: Handle changin...

(QB_NEW_EN)


[grammar] ~365-~365: There might be a mistake here.
Context: ...olution**: Handle changing state schemas 5. Distributed Checkpointing: Multi-node ...

(QB_NEW_EN)


[grammar] ~370-~370: There might be a mistake here.
Context: ...Native Serialization**: Faster than JSON 2. Compression Algorithms: Choose based o...

(QB_NEW_EN)


[grammar] ~372-~372: There might be a mistake here.
Context: ...ng Strategies**: Adaptive cache policies 4. Batch Sizing: Dynamic batch optimizati...

(QB_NEW_EN)

🔇 Additional comments (13)
README-STATE-MANAGEMENT.md (1)

106-116: Confirmed existence of performance structs

Both OptimizedS3Backend and PerformanceConfig are defined in the performance module. No updates to the README snippet are needed.

• crates/arkflow-core/src/state/performance.rs:31–34 defines PerformanceConfig
• crates/arkflow-core/src/state/performance.rs:72–75 defines OptimizedS3Backend

docs/state-management.md (1)

55-62: Verify maybe_inject_barrier return type

Some injectors return Option or produce a separate barrier batch. Confirm signature and adjust the example accordingly.

examples/stateful_pipeline.rs (1)

340-344: No type alignment changes required for ExactlyOnceProcessor

The ExactlyOnceProcessor::new constructor is defined as:

pub fn new(
    inner: P,
    state_manager: Arc<tokio::sync::RwLock<EnhancedStateManager>>,
    operator_id: String,
) -> Self {}

and in examples/stateful_pipeline.rs you pass

let state_manager = if config.state.enable_monitoring {
    let monitored = MonitoredStateManager::new().await?;
    Arc::new(RwLock::new(monitored.inner))  // yields EnhancedStateManager
} else {
    Arc::new(RwLock::new(EnhancedStateManager::new()?))
};
let exactly_once_processor = ExactlyOnceProcessor::new(
    processor,
    state_manager.clone(),
    "aggregation_pipeline".to_string(),
);

Here, state_manager.clone() is an Arc<RwLock<EnhancedStateManager>>, matching the constructor’s signature exactly. The code never passes a MonitoredStateManager itself—only its .inner field—so no change to ExactlyOnceProcessor::new is needed.

Likely an incorrect or invalid review comment.

crates/arkflow-core/src/state/integration_tests.rs (1)

59-74: LGTM: SimpleMemoryState typed put/get happy-paths covered

Good basic coverage for string and numeric types.

crates/arkflow-core/src/lib.rs (3)

38-38: Publicly exposing state is a breaking surface change—confirm semver expectations

Downstream crates will now see arkflow_core::state. Ensure this aligns with your release plan.


98-126: Potential BinaryArray construction issue elsewhere in this impl

Unrelated to this hunk but in the same impl: BinaryArray::from_vec(bytes) where bytes: Vec<&[u8]> may not match Arrow’s expected type. Prefer BinaryArray::from(binary_data) or from_iter_values.

If needed, change:

let binary_data: Vec<&[u8]> = content.iter().map(|v| v.as_slice()).collect();
let array = BinaryArray::from(binary_data);

330-352: No duplicates and barrier semantics validated

  • Confirmed only one MessageBatch::metadata implementation exists (no duplicates found).
  • Verified is_checkpoint() is defined in crates/arkflow-core/src/state/transaction.rs and invoked correctly in enhanced modules—barrier semantics align with the intended BarrierType contract.
docs/state-management-guide.md (1)

76-111: Docs Verification Complete: ExactlyOnceProcessor and TwoPhaseCommitOutput APIs Exist

I’ve confirmed that both types and their constructors match the documentation:

ExactlyOnceProcessor::new(inner, state_manager, operator_id: String) is defined in crates/arkflow-core/src/state/enhanced.rs (lines 312–318).
TwoPhaseCommitOutput::new(inner, state_manager) is defined in the same file (lines 387–394).

No changes to the guide are needed; the examples accurately reflect the public API.

Likely an incorrect or invalid review comment.

crates/arkflow-core/src/state/helper.rs (1)

20-31: Solid typed store abstraction; JSON serde mapping is consistent.

Trait bounds and the SimpleMemoryState implementation look correct. The JSON-to-bytes approach is coherent across get/put. Derives (Clone, Serialize, Deserialize) enable snapshotting and checkpointing cleanly.

Also applies to: 33-37, 39-56, 58-80, 82-113

crates/arkflow-core/src/state/simple.rs (1)

53-70: LGTM! Good defensive programming with optional chaining

The implementation properly handles missing input names and safely extracts/updates counts using and_then and unwrap_or. The state update is atomic within the write lock scope.

crates/arkflow-core/src/state/transaction.rs (1)

49-59: LGTM! Well-designed transaction context initialization

The implementation correctly generates unique transaction IDs using UUIDs and captures Unix timestamps. The structure supports different barrier types effectively.

crates/arkflow-core/src/state/s3_backend.rs (1)

116-121: Well-designed path structure for state organization

The hierarchical path structure with checkpoint/state/operator organization is clean and will scale well. The use of .json extension makes the storage format explicit.

crates/arkflow-core/src/state/enhanced.rs (1)

291-298: LGTM! Clean shutdown implementation

The shutdown method properly creates a final checkpoint before shutting down, ensuring state is persisted. The conditional logic based on configuration is appropriate.

Comment on lines 149 to 165
async fn process_transactional_batch(
&mut self,
batch: MessageBatch,
tx_ctx: TransactionContext,
) -> Result<Vec<MessageBatch>, Error> {
// Register transaction
self.register_transaction(&tx_ctx).await?;

// If this is a checkpoint barrier, trigger checkpoint
if tx_ctx.is_checkpoint() {
self.trigger_checkpoint(tx_ctx.checkpoint_id).await?;
}

// Process the batch (return as-is for now)
// In a real implementation, you'd apply transformations here
Ok(vec![batch])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Missing error handling for failed transactions

The process_transactional_batch method registers transactions but doesn't handle cleanup if the checkpoint trigger fails.

Add proper error handling and transaction cleanup:

 async fn process_transactional_batch(
     &mut self,
     batch: MessageBatch,
     tx_ctx: TransactionContext,
 ) -> Result<Vec<MessageBatch>, Error> {
     // Register transaction
     self.register_transaction(&tx_ctx).await?;
 
     // If this is a checkpoint barrier, trigger checkpoint
     if tx_ctx.is_checkpoint() {
-        self.trigger_checkpoint(tx_ctx.checkpoint_id).await?;
+        if let Err(e) = self.trigger_checkpoint(tx_ctx.checkpoint_id).await {
+            // Clean up the failed transaction
+            self.active_transactions.write().await.remove(&tx_ctx.transaction_id);
+            return Err(e);
+        }
     }
 
     // Process the batch (return as-is for now)
     // In a real implementation, you'd apply transformations here
     Ok(vec![batch])
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn process_transactional_batch(
&mut self,
batch: MessageBatch,
tx_ctx: TransactionContext,
) -> Result<Vec<MessageBatch>, Error> {
// Register transaction
self.register_transaction(&tx_ctx).await?;
// If this is a checkpoint barrier, trigger checkpoint
if tx_ctx.is_checkpoint() {
self.trigger_checkpoint(tx_ctx.checkpoint_id).await?;
}
// Process the batch (return as-is for now)
// In a real implementation, you'd apply transformations here
Ok(vec![batch])
}
async fn process_transactional_batch(
&mut self,
batch: MessageBatch,
tx_ctx: TransactionContext,
) -> Result<Vec<MessageBatch>, Error> {
// Register transaction
self.register_transaction(&tx_ctx).await?;
// If this is a checkpoint barrier, trigger checkpoint
if tx_ctx.is_checkpoint() {
if let Err(e) = self.trigger_checkpoint(tx_ctx.checkpoint_id).await {
// Clean up the failed transaction
self.active_transactions.write().await.remove(&tx_ctx.transaction_id);
return Err(e);
}
}
// Process the batch (return as-is for now)
// In a real implementation, you'd apply transformations here
Ok(vec![batch])
}
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/enhanced.rs around lines 149 to 165, the code
registers a transaction then calls trigger_checkpoint but does not clean up the
registered transaction if trigger_checkpoint (or subsequent processing) fails;
update the function so that any error after register_transaction results in
unregistering/rolling back the transaction (call the existing
unregister/rollback method with tx_ctx) before returning the error, and ensure
the transaction is also cleaned up on successful completion (use a
try/finally-style pattern or explicit match to call unregister on both error and
success paths) while preserving and returning the original error when failures
occur.

Comment on lines +23 to +31
pub mod enhanced;
pub mod helper;
pub mod integration_tests;
pub mod monitoring;
pub mod performance;
pub mod s3_backend;
pub mod simple;
pub mod tests;
pub mod transaction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Gate tests modules behind cfg(test).

Exporting tests/integration_tests publicly in the library increases compile surface and binary size.

-pub mod integration_tests;
@@
-pub mod tests;
+#[cfg(test)]
+pub mod integration_tests;
+#[cfg(test)]
+pub mod tests;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub mod enhanced;
pub mod helper;
pub mod integration_tests;
pub mod monitoring;
pub mod performance;
pub mod s3_backend;
pub mod simple;
pub mod tests;
pub mod transaction;
pub mod enhanced;
pub mod helper;
#[cfg(test)]
pub mod integration_tests;
pub mod monitoring;
pub mod performance;
pub mod s3_backend;
pub mod simple;
#[cfg(test)]
pub mod tests;
pub mod transaction;
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/mod.rs around lines 23–31, the test modules
(especially tests and integration_tests) are exported publicly which increases
compile surface; add the #[cfg(test)] attribute above the module declarations
for integration_tests and tests (or change them to #[cfg(test)] pub(crate) or
non-pub mod as appropriate) so they are only compiled for test builds, and
remove their public export if it isn’t required for tests to run.

Comment on lines +179 to +198
// Register all metrics
registry.register(Box::new(operations_total.clone()))?;
registry.register(Box::new(operations_success_total.clone()))?;
registry.register(Box::new(operations_failed_total.clone()))?;
registry.register(Box::new(operation_duration_seconds.clone()))?;
registry.register(Box::new(state_size_bytes.clone()))?;
registry.register(Box::new(checkpoint_size_bytes.clone()))?;
registry.register(Box::new(checkpoints_total.clone()))?;
registry.register(Box::new(checkpoint_duration_seconds.clone()))?;
registry.register(Box::new(checkpoint_success_total.clone()))?;
registry.register(Box::new(checkpoint_failed_total.clone()))?;
registry.register(Box::new(active_transactions.clone()))?;
registry.register(Box::new(transaction_duration_seconds.clone()))?;
registry.register(Box::new(cache_hits_total.clone()))?;
registry.register(Box::new(cache_misses_total.clone()))?;
registry.register(Box::new(cache_size_bytes.clone()))?;
registry.register(Box::new(s3_operations_total.clone()))?;
registry.register(Box::new(s3_operation_duration_seconds.clone()))?;
registry.register(Box::new(s3_errors_total.clone()))?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Map Prometheus register() errors into crate::Error (avoids type-mismatch on '?').

Unless From<prometheus::Error> for crate::Error exists, these ?s won’t compile.

Apply this diff to wrap each register() with map_err:

-        registry.register(Box::new(operations_total.clone()))?;
-        registry.register(Box::new(operations_success_total.clone()))?;
-        registry.register(Box::new(operations_failed_total.clone()))?;
-        registry.register(Box::new(operation_duration_seconds.clone()))?;
-        registry.register(Box::new(state_size_bytes.clone()))?;
-        registry.register(Box::new(checkpoint_size_bytes.clone()))?;
-        registry.register(Box::new(checkpoints_total.clone()))?;
-        registry.register(Box::new(checkpoint_duration_seconds.clone()))?;
-        registry.register(Box::new(checkpoint_success_total.clone()))?;
-        registry.register(Box::new(checkpoint_failed_total.clone()))?;
-        registry.register(Box::new(active_transactions.clone()))?;
-        registry.register(Box::new(transaction_duration_seconds.clone()))?;
-        registry.register(Box::new(cache_hits_total.clone()))?;
-        registry.register(Box::new(cache_misses_total.clone()))?;
-        registry.register(Box::new(cache_size_bytes.clone()))?;
-        registry.register(Box::new(s3_operations_total.clone()))?;
-        registry.register(Box::new(s3_operation_duration_seconds.clone()))?;
-        registry.register(Box::new(s3_errors_total.clone()))?;
+        registry.register(Box::new(operations_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_total: {e}")))?;
+        registry.register(Box::new(operations_success_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_success_total: {e}")))?;
+        registry.register(Box::new(operations_failed_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_failed_total: {e}")))?;
+        registry.register(Box::new(operation_duration_seconds.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operation_duration_seconds: {e}")))?;
+        registry.register(Box::new(state_size_bytes.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_size_bytes: {e}")))?;
+        registry.register(Box::new(checkpoint_size_bytes.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_size_bytes: {e}")))?;
+        registry.register(Box::new(checkpoints_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_total: {e}")))?;
+        registry.register(Box::new(checkpoint_duration_seconds.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_duration_seconds: {e}")))?;
+        registry.register(Box::new(checkpoint_success_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_success_total: {e}")))?;
+        registry.register(Box::new(checkpoint_failed_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_failed_total: {e}")))?;
+        registry.register(Box::new(active_transactions.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_active_transactions: {e}")))?;
+        registry.register(Box::new(transaction_duration_seconds.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_transaction_duration_seconds: {e}")))?;
+        registry.register(Box::new(cache_hits_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_hits_total: {e}")))?;
+        registry.register(Box::new(cache_misses_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_misses_total: {e}")))?;
+        registry.register(Box::new(cache_size_bytes.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_size_bytes: {e}")))?;
+        registry.register(Box::new(s3_operations_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operations_total: {e}")))?;
+        registry.register(Box::new(s3_operation_duration_seconds.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operation_duration_seconds: {e}")))?;
+        registry.register(Box::new(s3_errors_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_errors_total: {e}")))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Register all metrics
registry.register(Box::new(operations_total.clone()))?;
registry.register(Box::new(operations_success_total.clone()))?;
registry.register(Box::new(operations_failed_total.clone()))?;
registry.register(Box::new(operation_duration_seconds.clone()))?;
registry.register(Box::new(state_size_bytes.clone()))?;
registry.register(Box::new(checkpoint_size_bytes.clone()))?;
registry.register(Box::new(checkpoints_total.clone()))?;
registry.register(Box::new(checkpoint_duration_seconds.clone()))?;
registry.register(Box::new(checkpoint_success_total.clone()))?;
registry.register(Box::new(checkpoint_failed_total.clone()))?;
registry.register(Box::new(active_transactions.clone()))?;
registry.register(Box::new(transaction_duration_seconds.clone()))?;
registry.register(Box::new(cache_hits_total.clone()))?;
registry.register(Box::new(cache_misses_total.clone()))?;
registry.register(Box::new(cache_size_bytes.clone()))?;
registry.register(Box::new(s3_operations_total.clone()))?;
registry.register(Box::new(s3_operation_duration_seconds.clone()))?;
registry.register(Box::new(s3_errors_total.clone()))?;
// Register all metrics
registry.register(Box::new(operations_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_total: {e}")))?;
registry.register(Box::new(operations_success_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_success_total: {e}")))?;
registry.register(Box::new(operations_failed_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_failed_total: {e}")))?;
registry.register(Box::new(operation_duration_seconds.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_operation_duration_seconds: {e}")))?;
registry.register(Box::new(state_size_bytes.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_size_bytes: {e}")))?;
registry.register(Box::new(checkpoint_size_bytes.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_size_bytes: {e}")))?;
registry.register(Box::new(checkpoints_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_total: {e}")))?;
registry.register(Box::new(checkpoint_duration_seconds.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_duration_seconds: {e}")))?;
registry.register(Box::new(checkpoint_success_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_success_total: {e}")))?;
registry.register(Box::new(checkpoint_failed_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_failed_total: {e}")))?;
registry.register(Box::new(active_transactions.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_active_transactions: {e}")))?;
registry.register(Box::new(transaction_duration_seconds.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_transaction_duration_seconds: {e}")))?;
registry.register(Box::new(cache_hits_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_hits_total: {e}")))?;
registry.register(Box::new(cache_misses_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_misses_total: {e}")))?;
registry.register(Box::new(cache_size_bytes.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_size_bytes: {e}")))?;
registry.register(Box::new(s3_operations_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operations_total: {e}")))?;
registry.register(Box::new(s3_operation_duration_seconds.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operation_duration_seconds: {e}")))?;
registry.register(Box::new(s3_errors_total.clone()))
.map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_errors_total: {e}")))?;
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/monitoring.rs around lines 179-198, each
registry.register(...) call currently uses ? on a prometheus::Error which won't
match crate::Error; wrap each register(...) call with
.map_err(crate::Error::from)? (or .map_err(|e| crate::Error::from(e))?) so the
prometheus::Error is converted to crate::Error before using ?; apply this change
to every registry.register(...) line in the block.

Comment on lines +222 to +228
/// Operation timer for measuring duration
pub struct OperationTimer {
start: Instant,
metrics: Arc<StateMetrics>,
operation_type: OperationType,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

OperationTimer lacks Drop semantics; timers dropped without success/failure aren’t recorded.

Tests assume drop==success. Add a guard flag to support Drop-based success.

Apply this diff:

 pub struct OperationTimer {
     start: Instant,
     metrics: Arc<StateMetrics>,
     operation_type: OperationType,
+    recorded: bool,
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/monitoring.rs around lines 222 to 228,
OperationTimer currently has no Drop behavior so timers discarded without an
explicit success/failure call aren't recorded; tests expect drop==success. Add a
boolean guard (e.g., completed: AtomicBool or a plain bool if single-threaded)
to the struct, set it to true whenever success() or failure() is called,
implement Drop to check the guard and if not completed record a success and mark
completed, and ensure any methods that record metrics respect and set the guard
to avoid double-reporting.

Comment on lines +254 to +294
/// Stop timing and record success
pub fn success(self) {
let duration = self.start.elapsed().as_secs_f64();
self.metrics.operation_duration_seconds.observe(duration);
self.metrics.operations_success_total.inc();

// Record specific metric
match self.operation_type {
OperationType::Checkpoint => {
self.metrics.checkpoint_duration_seconds.observe(duration);
self.metrics.checkpoint_success_total.inc();
}
OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
self.metrics.s3_operation_duration_seconds.observe(duration);
}
OperationType::Transaction => {
self.metrics.transaction_duration_seconds.observe(duration);
}
_ => {}
}
}

/// Stop timing and record failure
pub fn failure(self) {
let duration = self.start.elapsed().as_secs_f64();
self.metrics.operation_duration_seconds.observe(duration);
self.metrics.operations_failed_total.inc();

// Record specific metric
match self.operation_type {
OperationType::Checkpoint => {
self.metrics.checkpoint_duration_seconds.observe(duration);
self.metrics.checkpoint_failed_total.inc();
}
OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
self.metrics.s3_operation_duration_seconds.observe(duration);
self.metrics.s3_errors_total.inc();
}
_ => {}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Mark timer as recorded in success/failure and implement Drop default-success.

Prevents leaks/double-counting; aligns with test assumption.

Apply this diff:

-    pub fn success(self) {
-        let duration = self.start.elapsed().as_secs_f64();
-        self.metrics.operation_duration_seconds.observe(duration);
-        self.metrics.operations_success_total.inc();
+    pub fn success(mut self) {
+        let duration = self.start.elapsed().as_secs_f64();
+        self.metrics.operation_duration_seconds.observe(duration);
+        self.metrics.operations_success_total.inc();
+        self.recorded = true;
         // Record specific metric
         match self.operation_type {
             OperationType::Checkpoint => {
                 self.metrics.checkpoint_duration_seconds.observe(duration);
                 self.metrics.checkpoint_success_total.inc();
             }
             OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
                 self.metrics.s3_operation_duration_seconds.observe(duration);
             }
             OperationType::Transaction => {
                 self.metrics.transaction_duration_seconds.observe(duration);
             }
             _ => {}
         }
     }
 
     /// Stop timing and record failure
-    pub fn failure(self) {
-        let duration = self.start.elapsed().as_secs_f64();
-        self.metrics.operation_duration_seconds.observe(duration);
-        self.metrics.operations_failed_total.inc();
+    pub fn failure(mut self) {
+        let duration = self.start.elapsed().as_secs_f64();
+        self.metrics.operation_duration_seconds.observe(duration);
+        self.metrics.operations_failed_total.inc();
+        self.recorded = true;
         // Record specific metric
         match self.operation_type {
             OperationType::Checkpoint => {
                 self.metrics.checkpoint_duration_seconds.observe(duration);
                 self.metrics.checkpoint_failed_total.inc();
             }
             OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
                 self.metrics.s3_operation_duration_seconds.observe(duration);
                 self.metrics.s3_errors_total.inc();
             }
             _ => {}
         }
     }

Add this new impl (outside the range):

impl Drop for OperationTimer {
    fn drop(&mut self) {
        if self.recorded {
            return;
        }
        // Default to success on drop
        let duration = self.start.elapsed().as_secs_f64();
        self.metrics.operation_duration_seconds.observe(duration);
        self.metrics.operations_success_total.inc();
        match self.operation_type {
            OperationType::Checkpoint => {
                self.metrics.checkpoint_duration_seconds.observe(duration);
                self.metrics.checkpoint_success_total.inc();
            }
            OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
                self.metrics.s3_operation_duration_seconds.observe(duration);
            }
            OperationType::Transaction => {
                self.metrics.transaction_duration_seconds.observe(duration);
            }
            _ => {}
        }
        self.recorded = true;
    }
}
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/monitoring.rs around lines 254 to 294, the
OperationTimer methods currently observe metrics but never mark the timer as
recorded, which can cause double-counting or leaks and tests expect a
default-success on Drop; update both success(self) and failure(self) to set
self.recorded = true after recording metrics, and add a Drop impl for
OperationTimer (outside the shown range) that checks if !self.recorded then
records a default success identical to success() (observe duration, increment
operations_success_total, record the specific per-operation metric) and finally
sets self.recorded = true.

Comment on lines +68 to +74
/// Get total word count from state
pub async fn get_total_words(&self) -> Result<u64, Error> {
let state_manager = self.state_manager.read().await;
state_manager
.get_state_value(&self.operator_id, &"total_words")
.await
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: return type mismatch (Result<u64, _> vs Result<Option, _>).

Unwrap with default and specify type.

-    pub async fn get_total_words(&self) -> Result<u64, Error> {
-        let state_manager = self.state_manager.read().await;
-        state_manager
-            .get_state_value(&self.operator_id, &"total_words")
-            .await
-    }
+    pub async fn get_total_words(&self) -> Result<u64, Error> {
+        let state_manager = self.state_manager.read().await;
+        Ok(
+            state_manager
+                .get_state_value::<u64>(&self.operator_id, "total_words")
+                .await?
+                .unwrap_or(0),
+        )
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Get total word count from state
pub async fn get_total_words(&self) -> Result<u64, Error> {
let state_manager = self.state_manager.read().await;
state_manager
.get_state_value(&self.operator_id, &"total_words")
.await
}
/// Get total word count from state
pub async fn get_total_words(&self) -> Result<u64, Error> {
let state_manager = self.state_manager.read().await;
Ok(
state_manager
.get_state_value::<u64>(&self.operator_id, "total_words")
.await?
.unwrap_or(0),
)
}
🤖 Prompt for AI Agents
In examples/word_count.rs around lines 68-74, the function returns Result<u64,
Error> but state_manager.get_state_value(...) returns Result<Option<u64>,
Error>; change the call to unwrap the Option with a default (0) while preserving
the Result error. For example, await the call and then map the successful Option
to a u64 with .map(|opt| opt.unwrap_or(0)), so the function returns Result<u64,
Error> (i.e., replace the final .await with .await.map(|opt| opt.unwrap_or(0))).

Comment on lines +76 to +83
/// Get count for specific word
pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> {
let state_manager = self.state_manager.read().await;
state_manager
.get_state_value(&self.operator_id, &format!("word_{}", word))
.await
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Same bug for get_word_count.

-    pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> {
-        let state_manager = self.state_manager.read().await;
-        state_manager
-            .get_state_value(&self.operator_id, &format!("word_{}", word))
-            .await
-    }
+    pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> {
+        let state_manager = self.state_manager.read().await;
+        Ok(
+            state_manager
+                .get_state_value::<u64>(&self.operator_id, &format!("word_{}", word))
+                .await?
+                .unwrap_or(0),
+        )
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Get count for specific word
pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> {
let state_manager = self.state_manager.read().await;
state_manager
.get_state_value(&self.operator_id, &format!("word_{}", word))
.await
}
/// Get count for specific word
pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> {
let state_manager = self.state_manager.read().await;
Ok(
state_manager
.get_state_value::<u64>(&self.operator_id, &format!("word_{}", word))
.await?
.unwrap_or(0),
)
}
🤖 Prompt for AI Agents
In examples/word_count.rs around lines 76-83, get_word_count currently returns
the raw result of state_manager.get_state_value which likely yields a
Result<Option<u64>, Error> (or Option<u64>), causing a type mismatch or
incorrect handling when the key is missing; change the call to await and then
map the returned Option to a u64 (e.g., Ok(value.unwrap_or(0))) while properly
propagating errors (use match or .map_ok(|opt| opt.unwrap_or(0)) / .await? and
return Result<u64, Error>) so missing words return 0 and errors are forwarded.

Comment on lines +263 to +276
// Print statistics
let processor_inner = exactly_once_processor;
let total_words = processor_inner.get_state(&"total_words").await?;
let hello_count = processor_inner.get_state(&"word_hello").await?;

println!("\nFinal Statistics:");
println!("Total words processed: {:?}", total_words);
println!("'hello' count: {:?}", hello_count);

// Create checkpoint
let mut state_manager_write = state_manager.write().await;
let checkpoint_id = state_manager_write.create_checkpoint().await?;
println!("Created checkpoint: {}", checkpoint_id);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Invalid API usage: ExactlyOnceProcessor likely doesn’t expose get_state; read from the shared state_manager instead and use the processor’s operator_id.

Fix by querying state directly.

-    let processor_inner = exactly_once_processor;
-    let total_words = processor_inner.get_state(&"total_words").await?;
-    let hello_count = processor_inner.get_state(&"word_hello").await?;
-
-    println!("\nFinal Statistics:");
-    println!("Total words processed: {:?}", total_words);
-    println!("'hello' count: {:?}", hello_count);
+    // Query state directly
+    {
+        let sm = state_manager.read().await;
+        let total_words: u64 = sm
+            .get_state_value("word_count_operator", "total_words")
+            .await?
+            .unwrap_or(0);
+        let hello_count: u64 = sm
+            .get_state_value("word_count_operator", "word_hello")
+            .await?
+            .unwrap_or(0);
+        println!("\nFinal Statistics:");
+        println!("Total words processed: {}", total_words);
+        println!("'hello' count: {}", hello_count);
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Print statistics
let processor_inner = exactly_once_processor;
let total_words = processor_inner.get_state(&"total_words").await?;
let hello_count = processor_inner.get_state(&"word_hello").await?;
println!("\nFinal Statistics:");
println!("Total words processed: {:?}", total_words);
println!("'hello' count: {:?}", hello_count);
// Create checkpoint
let mut state_manager_write = state_manager.write().await;
let checkpoint_id = state_manager_write.create_checkpoint().await?;
println!("Created checkpoint: {}", checkpoint_id);
// Print statistics
// Query state directly
{
let sm = state_manager.read().await;
let total_words: u64 = sm
.get_state_value("word_count_operator", "total_words")
.await?
.unwrap_or(0);
let hello_count: u64 = sm
.get_state_value("word_count_operator", "word_hello")
.await?
.unwrap_or(0);
println!("\nFinal Statistics:");
println!("Total words processed: {}", total_words);
println!("'hello' count: {}", hello_count);
}
// Create checkpoint
let mut state_manager_write = state_manager.write().await;
let checkpoint_id = state_manager_write.create_checkpoint().await?;
println!("Created checkpoint: {}", checkpoint_id);
🤖 Prompt for AI Agents
In examples/word_count.rs around lines 263 to 276, the code incorrectly calls
get_state on exactly_once_processor (which doesn't expose get_state); instead
acquire a read lock on the shared state_manager (state_manager.read().await),
use the processor's operator_id to query the two keys (e.g. "total_words" and
"word_hello") from that shared state, then print those results; keep the
checkpoint creation that follows but obtain a write lock only when creating the
checkpoint (state_manager.write().await) as before.

Comment on lines +121 to +128
use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager};

let monitor = Arc::new(StateMonitor::new()?);
let manager = MonitoredStateManager::new(config, monitor).await?;

// Export Prometheus metrics
let metrics = manager.export_metrics()?;
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

StateMonitor::new likely returns (monitor, registry); sample won’t compile as written

Per core snippet, StateMonitor::new() returns (StateMonitor, Registry). Adjust destructuring and remove/replace export_metrics() (not shown in snippets).

Apply:

-use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager};
+use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager};
+use std::sync::Arc;

-let monitor = Arc::new(StateMonitor::new()?);
-let manager = MonitoredStateManager::new(config, monitor).await?;
-
-// Export Prometheus metrics
-let metrics = manager.export_metrics()?;
+let (monitor, _registry) = StateMonitor::new()?;
+let monitor = Arc::new(monitor);
+let manager = MonitoredStateManager::new(config, monitor).await?;
+// Use the registry for exposition; see Prometheus example below.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager};
let monitor = Arc::new(StateMonitor::new()?);
let manager = MonitoredStateManager::new(config, monitor).await?;
// Export Prometheus metrics
let metrics = manager.export_metrics()?;
```
use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager};
use std::sync::Arc;
let (monitor, _registry) = StateMonitor::new()?;
let monitor = Arc::new(monitor);
let manager = MonitoredStateManager::new(config, monitor).await?;
// Use the registry for exposition; see Prometheus example below.
🤖 Prompt for AI Agents
In README-STATE-MANAGEMENT.md around lines 121 to 128, StateMonitor::new()
actually returns a tuple (StateMonitor, Registry) so the example must
destructure both values and use the returned registry; change the code to
capture both monitor and registry from StateMonitor::new(), pass the registry to
whatever telemetry setup needs it, and remove or replace the call to
manager.export_metrics() (which doesn’t exist) with the correct metrics
registration using the Registry returned by StateMonitor::new().

Comment on lines +222 to +237
// Start HTTP server for metrics
let registry = manager.monitor().registry();
HttpServer::new(move || {
App::new().app_data(registry.clone()).route(
"/metrics",
web::get().to(|registry: web::Data<Registry>| {
let encoder = TextEncoder::new();
let metric_families = registry.gather();
Ok(web::Bytes::from(encoder.encode_to_string(&metric_families).unwrap()))
})
)
})
.bind("0.0.0.0:9090")?
.run()
.await;
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Prometheus example uses a non-existent encode_to_string; fix encoding flow

The prometheus TextEncoder encodes into a buffer. Replace with the canonical pattern.

-        web::get().to(|registry: web::Data<Registry>| {
-            let encoder = TextEncoder::new();
-            let metric_families = registry.gather();
-            Ok(web::Bytes::from(encoder.encode_to_string(&metric_families).unwrap()))
-        })
+        web::get().to(|registry: web::Data<Registry>| {
+            use prometheus::{Encoder, TextEncoder};
+            let encoder = TextEncoder::new();
+            let metric_families = registry.gather();
+            let mut buf = Vec::new();
+            encoder.encode(&metric_families, &mut buf).map_err(|e| actix_web::error::ErrorInternalServerError(e))?;
+            Ok::<_, actix_web::Error>(web::Bytes::from(buf))
+        })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Start HTTP server for metrics
let registry = manager.monitor().registry();
HttpServer::new(move || {
App::new().app_data(registry.clone()).route(
"/metrics",
web::get().to(|registry: web::Data<Registry>| {
let encoder = TextEncoder::new();
let metric_families = registry.gather();
Ok(web::Bytes::from(encoder.encode_to_string(&metric_families).unwrap()))
})
)
})
.bind("0.0.0.0:9090")?
.run()
.await;
```
// Start HTTP server for metrics
let registry = manager.monitor().registry();
HttpServer::new(move || {
App::new().app_data(registry.clone()).route(
"/metrics",
web::get().to(|registry: web::Data<Registry>| {
use prometheus::{Encoder, TextEncoder};
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut buf = Vec::new();
encoder
.encode(&metric_families, &mut buf)
.map_err(|e| actix_web::error::ErrorInternalServerError(e))?;
Ok::<_, actix_web::Error>(web::Bytes::from(buf))
})
)
})
.bind("0.0.0.0:9090")?
.run()
.await;
🤖 Prompt for AI Agents
In README-STATE-MANAGEMENT.md around lines 222–237 the Prometheus TextEncoder
call uses a non-existent encode_to_string; replace it with the canonical flow:
create a mutable Vec<u8> buffer, call encoder.encode(&metric_families, &mut
buffer)? to write into that buffer, then return web::Bytes::from(buffer) (or
convert to String if you need text). Ensure you propagate or handle the encoder
error (use ? or map_err into an appropriate actix-web error) instead of
unwrap().

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (8)
crates/arkflow-core/src/config.rs (1)

211-227: Consider adding validation for S3 configuration

The S3 configuration accepts optional credentials. When credentials are not provided, the code will use the default AWS credential chain. Consider documenting this behavior more explicitly or adding validation to ensure either both access_key_id and secret_access_key are provided or neither.

Add a validation method:

impl S3StateBackendConfig {
    pub fn validate(&self) -> Result<(), Error> {
        match (&self.access_key_id, &self.secret_access_key) {
            (Some(_), None) | (None, Some(_)) => {
                Err(Error::Config("Both access_key_id and secret_access_key must be provided together".to_string()))
            }
            _ => Ok(())
        }
    }
}
examples/run_stateful_example.rs (1)

104-104: Handle join errors properly

The handle.await? will panic if a task panics. Consider handling this more gracefully.

-        handle.await?;
+        match handle.await {
+            Ok(_) => {},
+            Err(e) if e.is_panic() => {
+                error!("Stream task panicked: {:?}", e);
+            }
+            Err(e) => {
+                error!("Stream task failed: {:?}", e);
+            }
+        }
crates/arkflow-core/src/stream/mod.rs (1)

493-587: Consider extracting state manager creation logic

The build_with_state method contains complex logic for converting between configuration types and creating the state manager. Consider extracting this into a separate helper function for better maintainability and testability.

async fn create_state_manager(
    state_config: &StateManagementConfig,
    stream_state: &crate::config::StreamStateConfig,
) -> Result<Option<Arc<RwLock<EnhancedStateManager>>>, Error> {
    if !state_config.enabled || !stream_state.enabled {
        return Ok(None);
    }

    // Convert state config to enhanced state config
    let enhanced_config = crate::state::enhanced::EnhancedStateConfig {
        // ... configuration conversion logic ...
    };

    let state_manager = EnhancedStateManager::new(enhanced_config).await?;
    Ok(Some(Arc::new(RwLock::new(state_manager))))
}
crates/arkflow-core/src/engine_builder.rs (5)

20-21: Prepare for dedup with HashMap::entry

Import Entry to cleanly deduplicate state managers by operator_id.

-use std::collections::HashMap;
+use std::collections::{HashMap, hash_map::Entry};

51-99: Avoid duplicate state manager creation for the same operator_id

Currently, repeated operator_id values cause multiple managers to be created and all but the last dropped. Deduplicate with entry() and log once.

-            // Create a shared state manager for all streams that need it
-            for stream_config in &self.config.streams {
-                if let Some(stream_state) = &stream_config.state {
-                    if stream_state.enabled {
-                        let enhanced_config = crate::state::enhanced::EnhancedStateConfig {
-                            enabled: true,
-                            backend_type: match self.config.state_management.backend_type {
-                                crate::config::StateBackendType::Memory => {
-                                    crate::state::enhanced::StateBackendType::Memory
-                                }
-                                crate::config::StateBackendType::S3 => {
-                                    crate::state::enhanced::StateBackendType::S3
-                                }
-                                crate::config::StateBackendType::Hybrid => {
-                                    crate::state::enhanced::StateBackendType::Hybrid
-                                }
-                            },
-                            s3_config: self.config.state_management.s3_config.as_ref().map(
-                                |config| crate::state::s3_backend::S3StateBackendConfig {
-                                    bucket: config.bucket.clone(),
-                                    region: config.region.clone(),
-                                    endpoint: config.endpoint_url.clone(),
-                                    access_key_id: config.access_key_id.clone(),
-                                    secret_access_key: config.secret_access_key.clone(),
-                                    prefix: Some(config.prefix.clone()),
-                                    use_ssl: true,
-                                },
-                            ),
-                            checkpoint_interval_ms: self
-                                .config
-                                .state_management
-                                .checkpoint_interval_ms,
-                            retained_checkpoints: self.config.state_management.retained_checkpoints,
-                            exactly_once: self.config.state_management.exactly_once,
-                            state_timeout_ms: stream_state
-                                .state_timeout_ms
-                                .unwrap_or(self.config.state_management.state_timeout_ms),
-                        };
-
-                        let state_manager =
-                            crate::state::enhanced::EnhancedStateManager::new(enhanced_config)
-                                .await?;
-                        self.state_managers.insert(
-                            stream_state.operator_id.clone(),
-                            Arc::new(RwLock::new(state_manager)),
-                        );
-                    }
-                }
-            }
+            // Create a shared state manager for all streams that need it (deduplicated by operator_id)
+            for stream_config in &self.config.streams {
+                if let Some(stream_state) = &stream_config.state {
+                    if stream_state.enabled {
+                        match self
+                            .state_managers
+                            .entry(stream_state.operator_id.clone())
+                        {
+                            Entry::Occupied(_) => {
+                                warn!(
+                                    "Duplicate operator_id detected: {}. Reusing existing state manager.",
+                                    stream_state.operator_id
+                                );
+                            }
+                            Entry::Vacant(v) => {
+                                let enhanced_config = crate::state::enhanced::EnhancedStateConfig {
+                                    enabled: true,
+                                    backend_type: match self.config.state_management.backend_type {
+                                        crate::config::StateBackendType::Memory => {
+                                            crate::state::enhanced::StateBackendType::Memory
+                                        }
+                                        crate::config::StateBackendType::S3 => {
+                                            crate::state::enhanced::StateBackendType::S3
+                                        }
+                                        crate::config::StateBackendType::Hybrid => {
+                                            crate::state::enhanced::StateBackendType::Hybrid
+                                        }
+                                    },
+                                    s3_config: self.config.state_management.s3_config.as_ref().map(
+                                        |config| crate::state::s3_backend::S3StateBackendConfig {
+                                            bucket: config.bucket.clone(),
+                                            region: config.region.clone(),
+                                            endpoint: config.endpoint_url.clone(),
+                                            access_key_id: config.access_key_id.clone(),
+                                            secret_access_key: config.secret_access_key.clone(),
+                                            prefix: Some(config.prefix.clone()),
+                                            use_ssl: true,
+                                        },
+                                    ),
+                                    checkpoint_interval_ms: self
+                                        .config
+                                        .state_management
+                                        .checkpoint_interval_ms,
+                                    retained_checkpoints: self.config.state_management.retained_checkpoints,
+                                    exactly_once: self.config.state_management.exactly_once,
+                                    state_timeout_ms: stream_state
+                                        .state_timeout_ms
+                                        .unwrap_or(self.config.state_management.state_timeout_ms),
+                                };
+
+                                let state_manager = crate::state::enhanced::EnhancedStateManager::new(
+                                    enhanced_config,
+                                )
+                                .await?;
+                                v.insert(Arc::new(RwLock::new(state_manager)));
+                            }
+                        }
+                    }
+                }
+            }

104-129: Reduce noisy warnings when global state is disabled

If global state is off, per-stream enabled=true will currently warn “No state manager found…”. Gate the warning to avoid noise and add an info explaining the fallback.

-                        warn!(
-                            "No state manager found for operator: {}",
-                            stream_state.operator_id
-                        );
+                        if self.config.state_management.enabled {
+                            warn!(
+                                "No state manager found for operator: {}",
+                                stream_state.operator_id
+                            );
+                        } else {
+                            info!(
+                                "Global state management disabled; building non-stateful stream: {}",
+                                stream_state.operator_id
+                            );
+                        }

190-197: Drain managers on shutdown to free memory and prevent double-shutdown

Clears the map and ensures dropped Arcs after shutdown. Keeps shutdown idempotent across builder reuse.

-        for (_, state_manager) in self.state_managers.iter() {
+        for (_, state_manager) in self.state_managers.drain() {
             let mut manager = state_manager.write().await;
             manager.shutdown().await?;
         }

68-78: Expose and propagate SSL configuration for the S3 state backend

The field‐by‐field mapping in engine_builder.rs (and similarly in stream/mod.rs) correctly maps:

  • config.bucket: Stringbucket: String
  • config.region: Stringregion: String
  • config.endpoint_url: Option<String>endpoint: Option<String>
  • config.access_key_id: Option<String>access_key_id: Option<String>
  • config.secret_access_key: Option<String>secret_access_key: Option<String>
  • config.prefix: Stringprefix: Some(String)

However, use_ssl is currently hardcoded to true, and the public S3StateBackendConfig in crates/arkflow-core/src/config.rs does not expose a flag to disable TLS for non-AWS (e.g. MinIO) endpoints.

Optional refactoring recommendations:

  • In crates/arkflow-core/src/config.rs (the config‐file struct around line 213):
    • Add a field:
      #[serde(default = "default_use_ssl")]
      pub use_ssl: bool,
    • Define the default helper:
      fn default_use_ssl() -> bool {
          true
      }
  • In crates/arkflow-core/src/engine_builder.rs (currently hardcoding use_ssl around lines 75–76):
    • Change
      -   use_ssl: true,
      +   use_ssl: config.use_ssl,
  • In crates/arkflow-core/src/stream/mod.rs (around lines 551–553), make the same update.
  • Update any examples or tests (e.g. in examples/word_count.rs, examples/stateful_pipeline.rs) to set the new use_ssl field as appropriate.

This will allow users to opt out of SSL when targeting non-AWS S3–compatible storage.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1e7276e and 2990b25.

📒 Files selected for processing (8)
  • crates/arkflow-core/src/config.rs (2 hunks)
  • crates/arkflow-core/src/engine_builder.rs (1 hunks)
  • crates/arkflow-core/src/lib.rs (4 hunks)
  • crates/arkflow-core/src/stream/mod.rs (6 hunks)
  • docs/STATE_MANAGEMENT.md (1 hunks)
  • examples/run_stateful_example.rs (1 hunks)
  • examples/stateful_example.yaml (1 hunks)
  • examples/stateful_s3_example.yaml (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • docs/STATE_MANAGEMENT.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/arkflow-core/src/lib.rs
🧰 Additional context used
🧬 Code graph analysis (3)
examples/run_stateful_example.rs (2)
crates/arkflow-core/src/config.rs (1)
  • from_file (92-109)
crates/arkflow-core/src/engine_builder.rs (1)
  • new (33-38)
crates/arkflow-core/src/engine_builder.rs (2)
crates/arkflow-core/src/stream/mod.rs (2)
  • new (57-79)
  • output (354-394)
crates/arkflow-core/src/config.rs (3)
  • default (151-159)
  • default (168-174)
  • default (278-288)
crates/arkflow-core/src/stream/mod.rs (1)
crates/arkflow-core/src/config.rs (3)
  • default (151-159)
  • default (168-174)
  • default (278-288)
🪛 YAMLlint (1.37.1)
examples/stateful_example.yaml

[error] 16-16: trailing spaces

(trailing-spaces)


[error] 40-40: trailing spaces

(trailing-spaces)


[error] 71-71: trailing spaces

(trailing-spaces)


[error] 76-76: trailing spaces

(trailing-spaces)


[error] 115-115: no new line character at the end of file

(new-line-at-end-of-file)

examples/stateful_s3_example.yaml

[error] 21-21: trailing spaces

(trailing-spaces)


[error] 51-51: trailing spaces

(trailing-spaces)


[error] 70-70: trailing spaces

(trailing-spaces)


[error] 74-74: trailing spaces

(trailing-spaces)


[error] 80-80: trailing spaces

(trailing-spaces)


[error] 152-152: trailing spaces

(trailing-spaces)


[error] 177-177: no new line character at the end of file

(new-line-at-end-of-file)

🔇 Additional comments (2)
crates/arkflow-core/src/config.rs (1)

177-200: Well-structured state management configuration!

The StateManagementConfig struct is well-designed with sensible defaults and proper serde attributes. The configuration options cover all essential aspects of state management including checkpointing, exactly-once semantics, and backend selection.

crates/arkflow-core/src/engine_builder.rs (1)

133-181: Stream assembly looks correct and matches Stream::new signature

Input/pipeline/output/error_output/buffer/resource/thread_num/state_manager wiring aligns with Stream::new. Good separation via build_stream_with_state.

Comment on lines +568 to +570
// Note: Exactly-once processor would need to be integrated differently
// since it consumes the pipeline. For now, we just create the state manager
Some(state_manager_arc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Document the exactly-once processor integration limitation

The comment mentions that exactly-once processor integration would need to be handled differently. This limitation should be documented more prominently, perhaps in the module documentation or as a TODO with an issue reference.

Would you like me to help create a design document or open an issue to track the proper integration of exactly-once processing with the state manager?

🤖 Prompt for AI Agents
In crates/arkflow-core/src/stream/mod.rs around lines 568-570, the inline
comment about exactly-once processor integration being different needs a more
prominent documentation and tracking entry; update the module-level Rust doc
(//! ...) to mention this limitation and constraints, add a TODO comment near
this code that references a new or existing issue number (e.g. TODO: track
exactly-once processor integration - see issue #NNN), and if desired open a
repository issue describing required design changes (consuming pipeline, state
manager ownership, and migration path) and paste that issue number into the TODO
so future readers can find the detailed plan.

use arkflow::Error;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add missing import for Duration

The code uses Duration::from_secs on line 66 but doesn't import Duration.

 use arkflow::Error;
 use tokio_util::sync::CancellationToken;
+use std::time::Duration;
 use tracing::{error, info};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use arkflow::Error;
use tokio_util::sync::CancellationToken;
use std::time::Duration;
use tracing::{error, info};
🤖 Prompt for AI Agents
In examples/run_stateful_example.rs around lines 22 to 22 (usage at line 66),
the code calls Duration::from_secs but never imports Duration; add the missing
import by adding use std::time::Duration; alongside the other use statements at
the top of the file so Duration::from_secs resolves correctly.

retained_checkpoints: 3
exactly_once: true
state_timeout_ms: 3600000 # 1 hour

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove trailing whitespace

There's trailing whitespace on this line that should be removed.

-  state_timeout_ms: 3600000  # 1 hour
-  
+  state_timeout_ms: 3600000  # 1 hour
+
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
state_timeout_ms: 3600000 # 1 hour
🧰 Tools
🪛 YAMLlint (1.37.1)

[error] 16-16: trailing spaces

(trailing-spaces)

🤖 Prompt for AI Agents
In examples/stateful_example.yaml around line 16, remove the trailing whitespace
characters at the end of that line; edit the file to delete the extra space(s)
so the line ends cleanly, then save the file and optionally run your editor's
"trim trailing whitespace" or a linter/pre-commit hook to ensure no other
trailing whitespace remains.

script: |
import json
import re
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove trailing whitespace

There's trailing whitespace on this line that should be removed.

-            
+
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
🧰 Tools
🪛 YAMLlint (1.37.1)

[error] 40-40: trailing spaces

(trailing-spaces)

🤖 Prompt for AI Agents
In examples/stateful_example.yaml around line 40, remove the trailing whitespace
characters at the end of the line (ensure there are no spaces or tabs after the
last visible character) so the line ends cleanly; save the file with no trailing
whitespace on that line and run a quick whitespace/lint check to confirm
removal.

Comment on lines +71 to +76
SELECT
user_id,
event_type,
COUNT(*) as event_count,
SUM(value) as total_value
FROM flow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove trailing whitespace from SQL query

Multiple lines in the SQL query have trailing whitespace that should be cleaned up.

-            SELECT 
+            SELECT
               user_id,
               event_type,
               COUNT(*) as event_count,
               SUM(value) as total_value
-            FROM flow 
+            FROM flow
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
SELECT
user_id,
event_type,
COUNT(*) as event_count,
SUM(value) as total_value
FROM flow
SELECT
user_id,
event_type,
COUNT(*) as event_count,
SUM(value) as total_value
FROM flow
🧰 Tools
🪛 YAMLlint (1.37.1)

[error] 71-71: trailing spaces

(trailing-spaces)


[error] 76-76: trailing spaces

(trailing-spaces)

🤖 Prompt for AI Agents
In examples/stateful_example.yaml around lines 71 to 76, several lines of the
embedded SQL (SELECT user_id, event_type, COUNT(*) as event_count, SUM(value) as
total_value, FROM flow) contain trailing whitespace; remove any trailing spaces
at the end of each SQL line (and any extra blank-space-only characters), ensure
lines end cleanly with no trailing whitespace, and save the file so the SQL
block has no trailing spaces.

path: "./errors/temperature_errors.txt"
state:
operator_id: "temperature-processor"
enabled: true No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add newline at end of file

YAML files should end with a newline character.

     state:
       operator_id: "temperature-processor"
       enabled: true
+
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
enabled: true
state:
operator_id: "temperature-processor"
enabled: true
🧰 Tools
🪛 YAMLlint (1.37.1)

[error] 115-115: no new line character at the end of file

(new-line-at-end-of-file)

🤖 Prompt for AI Agents
In examples/stateful_example.yaml around line 115, the file is missing a
trailing newline; edit the file to add a single newline character at the end of
the file and save so the file ends with a newline (ensure LF newline if your
editor has options).

retained_checkpoints: 10 # Keep last 10 checkpoints
exactly_once: true # Enable exactly-once processing
state_timeout_ms: 2592000000 # 30 days

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove trailing whitespace from multiple lines

Several lines contain trailing whitespace that should be removed for consistency.

Apply these changes:

  • Line 21: Remove trailing space after comment
  • Line 51: Remove trailing space after import
  • Line 70: Remove trailing space after script block
  • Line 74: Remove trailing space in SQL query
  • Line 80: Remove trailing space after SQL query
  • Line 152: Remove trailing space after VRL script comment

Also applies to: 51-51, 70-70, 74-74, 80-80, 152-152

🧰 Tools
🪛 YAMLlint (1.37.1)

[error] 21-21: trailing spaces

(trailing-spaces)

🤖 Prompt for AI Agents
In examples/stateful_s3_example.yaml around lines 21, 51, 70, 74, 80 and 152
there are trailing whitespace characters on those lines; remove the trailing
space at the end of each specified line (line 21: comment, line 51: after
import, line 70: after script block, line 74: inside SQL query end, line 80:
after SQL query, line 152: after VRL script comment) so each line ends cleanly
with no extra spaces.

state:
operator_id: "iot-monitor"
enabled: true
state_timeout_ms: 604800000 # 7 days No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add newline at end of file

YAML files should end with a newline character.

       operator_id: "iot-monitor"
       enabled: true
       state_timeout_ms: 604800000  # 7 days
+
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
state_timeout_ms: 604800000 # 7 days
operator_id: "iot-monitor"
enabled: true
state_timeout_ms: 604800000 # 7 days
🧰 Tools
🪛 YAMLlint (1.37.1)

[error] 177-177: no new line character at the end of file

(new-line-at-end-of-file)

🤖 Prompt for AI Agents
In examples/stateful_s3_example.yaml around line 177, the file is missing a
trailing newline; add a single newline character at the end of the file so the
last line ("state_timeout_ms: 604800000  # 7 days") is terminated by a newline
to comply with POSIX/YAML conventions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/arkflow-core/src/stream/mod.rs (1)

495-505: Respect stateful wrapper during stream close.

Currently only pipeline.close() is called; call stateful_pipeline.close() when present.

-        info!("pipeline close...");
-        if let Err(e) = self.pipeline.close().await {
+        info!("pipeline close...");
+        let close_res = if let Some(sp) = &self.stateful_pipeline {
+            sp.close().await
+        } else {
+            self.pipeline.close().await
+        };
+        if let Err(e) = close_res {
             error!("Failed to close pipeline: {}", e);
         }
♻️ Duplicate comments (6)
crates/arkflow-core/src/state/s3_backend.rs (1)

409-413: Checkpoint ID generation is racy across concurrent starters

Computing next = max(existing)+1 is not atomic; can collide under concurrency or multi-process. Use an atomic counter initialized from S3, or serialize allocation. This was raised previously.

+use std::sync::atomic::{AtomicU64, Ordering};
@@
 pub struct S3CheckpointCoordinator {
@@
     checkpoint_timeout: std::time::Duration,
+    next_checkpoint_id: Arc<AtomicU64>,
 }
@@
     pub fn new(backend: Arc<S3StateBackend>) -> Self {
         Self {
             backend,
             active_checkpoints: HashMap::new(),
             checkpoint_timeout: std::time::Duration::from_secs(300), // 5 分钟
+            next_checkpoint_id: Arc::new(AtomicU64::new(0)),
         }
     }
+
+    async fn initialize_checkpoint_id(&self) -> Result<(), Error> {
+        let checkpoints = self.backend.list_checkpoints().await?;
+        let max_id = checkpoints.first().copied().unwrap_or(0);
+        self.next_checkpoint_id.store(max_id + 1, Ordering::SeqCst);
+        Ok(())
+    }
@@
     pub async fn start_checkpoint(&mut self) -> Result<u64, Error> {
-        // Get next checkpoint ID
-        let checkpoints = self.backend.list_checkpoints().await?;
-        let checkpoint_id = checkpoints.first().map_or(1, |id| id + 1);
+        // Thread-safe ID generation
+        let current = self.next_checkpoint_id.load(Ordering::SeqCst);
+        if current == 0 {
+            self.initialize_checkpoint_id().await?;
+        }
+        let checkpoint_id = self.next_checkpoint_id.fetch_add(1, Ordering::SeqCst);

Note: For multi-process safety, consider a conditional-create on a “counter” object or fencing via an S3-based CAS; otherwise add retry if metadata already exists.

crates/arkflow-core/src/stream/mod.rs (1)

651-653: Document exactly-once processor integration limitation more prominently.

Add a module-level note and a TODO with an issue reference here.

crates/arkflow-core/src/state/monitoring.rs (4)

191-209: Map Prometheus register() errors into crate::Error.

-        registry.register(Box::new(operations_total.clone()))?;
+        registry.register(Box::new(operations_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_total: {e}")))?;
-        registry.register(Box::new(operations_success_total.clone()))?;
+        registry.register(Box::new(operations_success_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_success_total: {e}")))?;
-        registry.register(Box::new(operations_failed_total.clone()))?;
+        registry.register(Box::new(operations_failed_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_failed_total: {e}")))?;
-        registry.register(Box::new(operation_duration_seconds.clone()))?;
+        registry.register(Box::new(operation_duration_seconds.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operation_duration_seconds: {e}")))?;
-        registry.register(Box::new(state_size_bytes.clone()))?;
+        registry.register(Box::new(state_size_bytes.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_size_bytes: {e}")))?;
-        registry.register(Box::new(checkpoint_size_bytes.clone()))?;
+        registry.register(Box::new(checkpoint_size_bytes.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_size_bytes: {e}")))?;
-        registry.register(Box::new(checkpoints_total.clone()))?;
+        registry.register(Box::new(checkpoints_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_total: {e}")))?;
-        registry.register(Box::new(checkpoint_duration_seconds.clone()))?;
+        registry.register(Box::new(checkpoint_duration_seconds.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_duration_seconds: {e}")))?;
-        registry.register(Box::new(checkpoint_success_total.clone()))?;
+        registry.register(Box::new(checkpoint_success_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_success_total: {e}")))?;
-        registry.register(Box::new(checkpoint_failed_total.clone()))?;
+        registry.register(Box::new(checkpoint_failed_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_failed_total: {e}")))?;
-        registry.register(Box::new(active_transactions.clone()))?;
+        registry.register(Box::new(active_transactions.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_active_transactions: {e}")))?;
-        registry.register(Box::new(transaction_duration_seconds.clone()))?;
+        registry.register(Box::new(transaction_duration_seconds.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_transaction_duration_seconds: {e}")))?;
-        registry.register(Box::new(cache_hits_total.clone()))?;
+        registry.register(Box::new(cache_hits_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_hits_total: {e}")))?;
-        registry.register(Box::new(cache_misses_total.clone()))?;
+        registry.register(Box::new(cache_misses_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_misses_total: {e}")))?;
-        registry.register(Box::new(cache_size_bytes.clone()))?;
+        registry.register(Box::new(cache_size_bytes.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_size_bytes: {e}")))?;
-        registry.register(Box::new(s3_operations_total.clone()))?;
+        registry.register(Box::new(s3_operations_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operations_total: {e}")))?;
-        registry.register(Box::new(s3_operation_duration_seconds.clone()))?;
+        registry.register(Box::new(s3_operation_duration_seconds.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operation_duration_seconds: {e}")))?;
-        registry.register(Box::new(s3_errors_total.clone()))?;
+        registry.register(Box::new(s3_errors_total.clone()))
+            .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_errors_total: {e}")))?;

233-239: Add Drop semantics to OperationTimer to avoid leaks/double-counting.

 pub struct OperationTimer {
     start: Instant,
     metrics: Arc<StateMetrics>,
     operation_type: OperationType,
+    recorded: bool,
 }
@@
-    pub fn success(self) {
+    pub fn success(mut self) {
         let duration = self.start.elapsed().as_secs_f64();
         self.metrics.operation_duration_seconds.observe(duration);
         self.metrics.operations_success_total.inc();
+        self.recorded = true;
@@
-    pub fn failure(self) {
+    pub fn failure(mut self) {
         let duration = self.start.elapsed().as_secs_f64();
         self.metrics.operation_duration_seconds.observe(duration);
         self.metrics.operations_failed_total.inc();
+        self.recorded = true;
@@
     }

Add:

impl Drop for OperationTimer {
    fn drop(&mut self) {
        if self.recorded { return; }
        let duration = self.start.elapsed().as_secs_f64();
        self.metrics.operation_duration_seconds.observe(duration);
        self.metrics.operations_success_total.inc();
        match self.operation_type {
            OperationType::Checkpoint => {
                self.metrics.checkpoint_duration_seconds.observe(duration);
                self.metrics.checkpoint_success_total.inc();
            }
            OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
                self.metrics.s3_operation_duration_seconds.observe(duration);
            }
            OperationType::Transaction => {
                self.metrics.transaction_duration_seconds.observe(duration);
            }
            _ => {}
        }
        self.recorded = true;
    }
}

Also applies to: 265-305


401-407: Use Encoder::encode for broader Prometheus compatibility.

-        let encoder = TextEncoder::new();
-        let metric_families = self.registry.gather();
-        encoder
-            .encode_to_string(&metric_families)
-            .map_err(|e| Error::Process(format!("Failed to encode metrics: {}", e)))
+        let encoder = TextEncoder::new();
+        let metric_families = self.registry.gather();
+        let mut buf = Vec::new();
+        prometheus::Encoder::encode(&encoder, &metric_families, &mut buf)
+            .map_err(|e| Error::Process(format!("Failed to encode metrics: {e}")))?;
+        String::from_utf8(buf).map_err(|e| Error::Process(format!("Metrics not UTF-8: {e}")))

583-593: Strengthen timer test with assertions.

     #[test]
     fn test_operation_timer() {
         let (metrics, _) = StateMetrics::new().unwrap();
         let metrics = Arc::new(metrics);
 
         let timer = OperationTimer::start(metrics.clone(), OperationType::Get);
-        drop(timer); // This should record success
+        drop(timer); // default success via Drop
+        assert_eq!(metrics.operations_total.get() as u64, 1);
+        assert_eq!(metrics.operations_success_total.get() as u64, 1);
 
         let timer = OperationTimer::start(metrics, OperationType::Get);
         timer.failure();
+        assert_eq!(metrics.operations_total.get() as u64, 2);
+        assert_eq!(metrics.operations_failed_total.get() as u64, 1);
     }
🧹 Nitpick comments (16)
examples/verify_state_integration.rs (4)

80-83: Suppress unused variable warning for streams.

streams is built but never used. This will emit a warning and might fail CI if warnings are denied.

Use placeholder or underscore binding:

-    let streams = engine_builder.build_streams().await?;
+    let _streams = engine_builder.build_streams().await?;

Alternatively, if not needed at all:

-    let streams = engine_builder.build_streams().await?;
+    let _ = engine_builder.build_streams().await?;

88-90: Iterate by reference to avoid moving the map and to match the other example.

Safer and consistent whether get_state_managers() returns an owned map or a reference.

-    for (operator_id, _) in state_managers {
+    for (operator_id, _) in state_managers.iter() {
         println!("State manager created for operator: {}", operator_id);
     }

6-7: Use HTTPS for Apache License URL.

Minor polish and best practice.

- *        http://www.apache.org/licenses/LICENSE-2.0
+ *        https://www.apache.org/licenses/LICENSE-2.0

26-29: Optional: switch println! to tracing for consistency with other examples.

The other example initializes tracing_subscriber and uses info!. Consider aligning for consistent diagnostics.

Possible change (imports and init required at top):

-    println!("Testing state management integration...");
+    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
+    tracing::info!("Testing state management integration...");
@@
-    println!("Building streams with state management...");
+    tracing::info!("Building streams with state management...");
@@
-    println!("Created {} state managers", state_managers.len());
+    tracing::info!("Created {} state managers", state_managers.len());

Also applies to: 81-87

examples/test_state_management.rs (2)

27-27: Use tracing consistently for errors in spawned task

Import error to replace eprintln! below.

-use tracing::{info, Level};
+use tracing::{info, error, Level};

101-103: Replace eprintln! with tracing::error!

Keep logging consistent and structured.

-                eprintln!("流 {} 失败: {}", i, e);
+                error!("流 {} 失败: {}", i, e);
crates/arkflow-core/src/state/s3_backend.rs (2)

67-70: Unused local_cache field

local_cache is never read. Remove it or wire it into read-through/write-back paths.

-    /// 本地缓存
-    local_cache: HashMap<String, SimpleMemoryState>,

And corresponding initializer removal.


500-506: Metadata ‘operators’ never populated

Consider recording operator/state sizes in metadata on finalize to aid observability and cleanup decisions.

examples/test_state_config.yaml (1)

29-29: Add trailing newline

Comply with linters.

-        - "processed_count"
+        - "processed_count"
+
crates/arkflow-core/tests/state_management_integration.rs (2)

25-25: Remove unused imports.

sleep and Duration are unused in this test.

-use tokio::time::{sleep, Duration};
+// use tokio::time::{sleep, Duration};

29-38: Prefer unique temp dirs to avoid flakiness and ensure cleanup.

Use tempfile::tempdir() (dev-dependency) to avoid collisions when tests run in parallel.

-    // Create a temporary input file
-    let temp_dir = "/tmp/arkflow_test";
-    fs::create_dir_all(temp_dir).unwrap();
-    let input_path = format!("{}/test_input.txt", temp_dir);
+    // Create a unique temporary input file
+    let tmp = tempfile::tempdir().unwrap();
+    let input_path = tmp.path().join("test_input.txt");
 
-    fs::write(
-        &input_path,
+    fs::write(
+        &input_path,
         "Hello World\nThis is a test\nState management test\n",
     )
     .unwrap();
@@
-    // Clean up
-    fs::remove_dir_all(temp_dir).ok();
+    // TempDir auto-cleans on drop

Also applies to: 93-95

crates/arkflow-core/tests/comprehensive_state_test.rs (1)

171-179: Minor: avoid unnecessary type annotation.

Let type inference handle Option<String>.

-        let value: Option<String> = manager
-            .get_state_value(operator_id, &"test_key")
+        let value = manager
+            .get_state_value::<_, String>(operator_id, &"test_key")
             .await
             .unwrap();
crates/arkflow-core/src/stream/mod.rs (2)

32-41: operator_id in StatefulPipeline is unused. Remove or wire it.

Dead field adds noise; if needed for future state updates, pass it into state ops.

 pub struct StatefulPipeline {
     /// 内部 pipeline
     inner: Arc<Pipeline>,
     /// 状态管理器
     state_manager: Option<Arc<RwLock<EnhancedStateManager>>>,
-    /// 操作符 ID
-    operator_id: Option<String>,
 }
@@
-        operator_id: Option<String>,
     ) -> Self {
         Self {
             inner: pipeline,
             state_manager,
-            operator_id,
         }
     }

And adjust the single call site:

-            StatefulPipeline::new(pipeline_arc.clone(), Some(sm), None)
+            StatefulPipeline::new(pipeline_arc.clone(), Some(sm))

338-371: Avoid extra clone of msg unless required.

If error path doesn’t need original msg, remove .clone() to reduce allocations.

-            let processed = if let Some(ref stateful_pipe) = stateful_pipeline {
-                stateful_pipe.process(msg.clone()).await
+            let processed = if let Some(ref stateful_pipe) = stateful_pipeline {
+                stateful_pipe.process(msg).await
             } else {
-                pipeline.process(msg.clone()).await
+                pipeline.process(msg).await
             };

If you need msg for the error branch, keep as-is.

crates/arkflow-core/src/state/enhanced.rs (2)

75-79: Unused config knobs (exactly_once) not enforced.

They’re defined but not used to change behavior (e.g., guarding barriers/2PC).

Consider gating 2PC/barrier paths on exactly_once.

Also applies to: 300-304


417-428: pending_transactions is unused; either use it or remove.

Currently 2PC logs but doesn’t stage data.

Also applies to: 455-497

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 2990b25 and 6efbee8.

📒 Files selected for processing (20)
  • crates/arkflow-core/src/cli/mod.rs (1 hunks)
  • crates/arkflow-core/src/engine/mod.rs (3 hunks)
  • crates/arkflow-core/src/state/enhanced.rs (1 hunks)
  • crates/arkflow-core/src/state/example.rs (1 hunks)
  • crates/arkflow-core/src/state/helper.rs (1 hunks)
  • crates/arkflow-core/src/state/integration_tests.rs (1 hunks)
  • crates/arkflow-core/src/state/mod.rs (1 hunks)
  • crates/arkflow-core/src/state/monitoring.rs (1 hunks)
  • crates/arkflow-core/src/state/performance.rs (1 hunks)
  • crates/arkflow-core/src/state/s3_backend.rs (1 hunks)
  • crates/arkflow-core/src/state/simple.rs (1 hunks)
  • crates/arkflow-core/src/state/tests.rs (1 hunks)
  • crates/arkflow-core/src/state/transaction.rs (1 hunks)
  • crates/arkflow-core/src/stream/mod.rs (8 hunks)
  • crates/arkflow-core/tests/comprehensive_state_test.rs (1 hunks)
  • crates/arkflow-core/tests/state_management_integration.rs (1 hunks)
  • examples/data/test.txt (1 hunks)
  • examples/test_state_config.yaml (1 hunks)
  • examples/test_state_management.rs (1 hunks)
  • examples/verify_state_integration.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • examples/data/test.txt
🚧 Files skipped from review as they are similar to previous changes (8)
  • crates/arkflow-core/src/state/integration_tests.rs
  • crates/arkflow-core/src/state/tests.rs
  • crates/arkflow-core/src/state/mod.rs
  • crates/arkflow-core/src/state/example.rs
  • crates/arkflow-core/src/state/performance.rs
  • crates/arkflow-core/src/state/transaction.rs
  • crates/arkflow-core/src/state/simple.rs
  • crates/arkflow-core/src/state/helper.rs
🧰 Additional context used
🧬 Code graph analysis (10)
crates/arkflow-core/tests/state_management_integration.rs (3)
crates/arkflow-core/src/state/enhanced.rs (5)
  • write (432-453)
  • default (82-92)
  • new (108-139)
  • new (342-352)
  • new (422-429)
crates/arkflow-core/src/engine/mod.rs (1)
  • new (82-90)
crates/arkflow-core/src/state/mod.rs (1)
  • new (63-68)
crates/arkflow-core/src/cli/mod.rs (1)
crates/arkflow-core/src/state/example.rs (1)
  • config (169-171)
examples/test_state_management.rs (3)
crates/arkflow-core/src/stream/mod.rs (2)
  • output (437-477)
  • state_manager (97-99)
examples/verify_state_integration.rs (1)
  • main (27-97)
crates/arkflow-core/src/state/enhanced.rs (4)
  • default (82-92)
  • new (108-139)
  • new (342-352)
  • new (422-429)
crates/arkflow-core/tests/comprehensive_state_test.rs (5)
crates/arkflow-core/src/stream/mod.rs (3)
  • output (437-477)
  • build (533-574)
  • state_manager (97-99)
crates/arkflow-core/src/lib.rs (2)
  • from_string (214-216)
  • len (222-224)
crates/arkflow-core/src/state/enhanced.rs (5)
  • new (108-139)
  • new (342-352)
  • new (422-429)
  • write (432-453)
  • default (82-92)
crates/arkflow-core/src/input/mod.rs (1)
  • register_input_builder (120-133)
crates/arkflow-core/src/output/mod.rs (1)
  • register_output_builder (77-90)
examples/verify_state_integration.rs (2)
examples/test_state_management.rs (1)
  • main (30-171)
crates/arkflow-core/src/engine/mod.rs (1)
  • new (82-90)
crates/arkflow-core/src/engine/mod.rs (3)
crates/arkflow-core/src/stream/mod.rs (5)
  • new (45-55)
  • new (128-156)
  • process (58-84)
  • process (689-691)
  • process (724-726)
crates/arkflow-core/src/state/enhanced.rs (1)
  • process (355-383)
crates/arkflow-core/src/pipeline/mod.rs (1)
  • process (35-48)
crates/arkflow-core/src/state/s3_backend.rs (3)
crates/arkflow-core/src/state/enhanced.rs (3)
  • new (108-139)
  • new (342-352)
  • new (422-429)
crates/arkflow-core/src/state/helper.rs (5)
  • new (42-46)
  • get_typed (23-25)
  • get_typed (60-72)
  • put_typed (28-30)
  • put_typed (74-82)
crates/arkflow-core/src/state/performance.rs (6)
  • new (126-131)
  • new (164-189)
  • new (454-461)
  • local_state (498-500)
  • get_typed (509-514)
  • put_typed (516-521)
crates/arkflow-core/src/stream/mod.rs (3)
crates/arkflow-core/src/pipeline/mod.rs (3)
  • new (30-32)
  • process (35-48)
  • close (51-56)
crates/arkflow-core/src/state/enhanced.rs (2)
  • process (355-383)
  • default (82-92)
crates/arkflow-core/src/config.rs (3)
  • default (151-159)
  • default (168-174)
  • default (278-288)
crates/arkflow-core/src/state/monitoring.rs (1)
crates/arkflow-core/src/state/enhanced.rs (4)
  • new (108-139)
  • new (342-352)
  • new (422-429)
  • default (82-92)
crates/arkflow-core/src/state/enhanced.rs (5)
crates/arkflow-core/src/state/helper.rs (1)
  • new (42-46)
crates/arkflow-core/src/state/monitoring.rs (4)
  • new (68-72)
  • new (369-377)
  • new (497-509)
  • inner (512-514)
crates/arkflow-core/src/state/s3_backend.rs (3)
  • new (74-112)
  • new (313-321)
  • new (400-406)
crates/arkflow-core/src/state/simple.rs (5)
  • new (36-41)
  • new (99-104)
  • new (157-163)
  • process (44-74)
  • write (107-137)
crates/arkflow-core/src/stream/mod.rs (5)
  • inner (92-94)
  • state_manager (97-99)
  • process (58-84)
  • process (689-691)
  • process (724-726)
🪛 YAMLlint (1.37.1)
examples/test_state_config.yaml

[error] 29-29: no new line character at the end of file

(new-line-at-end-of-file)

🔇 Additional comments (4)
examples/verify_state_integration.rs (1)

33-41: Drop the typed‐field suggestionInputConfig in arkflow-core is defined with input_type, name, and a generic config field only; it has no path or format members. The example’s JSON blob under config matches the API, so the proposed diff should be discarded.

Likely an incorrect or invalid review comment.

examples/test_state_config.yaml (1)

8-8: Drop backend_type casing warning
The StateBackendType enum in crates/arkflow-core/src/config.rs has #[serde(rename_all = "lowercase")], so "memory" deserializes to Memory without issue.

Likely an incorrect or invalid review comment.

crates/arkflow-core/tests/comprehensive_state_test.rs (1)

204-207: Apply the same guarded registration in the second test.

-    // Register mock components
-    let _ = arkflow_core::input::register_input_builder("mock", Arc::new(MockInputBuilder));
-    let _ = arkflow_core::output::register_output_builder("mock", Arc::new(MockOutputBuilder));
+    register_mock_components();
crates/arkflow-core/src/state/enhanced.rs (1)

268-273: current_checkpoint_id.fetch_add(1) returns the previous ID. Confirm intent.

If you want monotonically increasing IDs starting at 1, this returns 1,2,…; OK. If you expect the new ID, use fetch_add(1)+1.

pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
// Initialize the logging system
let config = self.config.clone().unwrap();
let config = self.config.clone().ok_or("No configuration loaded")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Non-compiling ok_or with &str into Box

ok_or("...")? can’t convert &str into Box<dyn Error>. Return a concrete error.

-        let config = self.config.clone().ok_or("No configuration loaded")?;
+        let config = self
+            .config
+            .clone()
+            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No configuration loaded"))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config = self.config.clone().ok_or("No configuration loaded")?;
let config = self
.config
.clone()
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No configuration loaded"))?;
🤖 Prompt for AI Agents
In crates/arkflow-core/src/cli/mod.rs around line 76, the expression uses
ok_or("No configuration loaded")? which fails because a &str cannot be converted
into Box<dyn Error>; replace this with a concrete error value matching the
function's error type — e.g., return a boxed error via ok_or_else(|| Box::<dyn
std::error::Error>::from("No configuration loaded"))? or, if using anyhow in the
crate, use ok_or_else(|| anyhow::anyhow!("No configuration loaded"))? so the
Result's Err variant is a concrete error type rather than a plain &str.

Comment on lines +231 to +237
let mut streams = match engine_builder.build_streams().await {
Ok(streams) => streams,
Err(e) => {
error!("Failed to build streams: {}", e);
process::exit(1);
}
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid process::exit in library method; propagate error instead

Engine::run should not terminate the process. Return an error and let the caller decide.

-        let mut streams = match engine_builder.build_streams().await {
-            Ok(streams) => streams,
-            Err(e) => {
-                error!("Failed to build streams: {}", e);
-                process::exit(1);
-            }
-        };
+        let mut streams = engine_builder
+            .build_streams()
+            .await
+            .map_err(|e| {
+                error!("Failed to build streams: {}", e);
+                std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
+            })?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut streams = match engine_builder.build_streams().await {
Ok(streams) => streams,
Err(e) => {
error!("Failed to build streams: {}", e);
process::exit(1);
}
}
};
let mut streams = engine_builder
.build_streams()
.await
.map_err(|e| {
error!("Failed to build streams: {}", e);
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
})?;
🤖 Prompt for AI Agents
In crates/arkflow-core/src/engine/mod.rs around lines 231 to 237, do not call
process::exit when build_streams fails; instead propagate the error to the
caller. Replace the match that logs and calls process::exit with code that
returns an Err (after logging or by using ? to propagate), and update
Engine::run's signature to return a Result type (e.g., Result<(), EngineError>
or anyhow::Error) so callers decide how to handle termination; ensure any
necessary error type conversions are applied where you return the build_streams
error.

Comment on lines +162 to +179
/// 处理事务批次
async fn process_transactional_batch(
&mut self,
batch: MessageBatch,
tx_ctx: TransactionContext,
) -> Result<Vec<MessageBatch>, Error> {
// 注册事务
self.register_transaction(&tx_ctx).await?;

// 如果这是检查点屏障,触发检查点
if tx_ctx.is_checkpoint() {
self.trigger_checkpoint(tx_ctx.checkpoint_id).await?;
}

// 处理批次(目前原样返回)
// 在实际实现中,这里会应用状态转换
Ok(vec![batch])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Leak: transactions are never cleaned up on error/success.

If trigger_checkpoint fails (or after success), the transaction remains in active_transactions.

     async fn process_transactional_batch(
         &mut self,
         batch: MessageBatch,
         tx_ctx: TransactionContext,
     ) -> Result<Vec<MessageBatch>, Error> {
         // 注册事务
         self.register_transaction(&tx_ctx).await?;
 
         // 如果这是检查点屏障,触发检查点
         if tx_ctx.is_checkpoint() {
-            self.trigger_checkpoint(tx_ctx.checkpoint_id).await?;
+            if let Err(e) = self.trigger_checkpoint(tx_ctx.checkpoint_id).await {
+                // 清理失败事务
+                self.unregister_transaction(&tx_ctx.transaction_id).await;
+                return Err(e);
+            }
         }
 
         // 处理批次(目前原样返回)
         // 在实际实现中,这里会应用状态转换
-        Ok(vec![batch])
+        let result = vec![batch];
+        // 清理成功事务
+        self.unregister_transaction(&tx_ctx.transaction_id).await;
+        Ok(result)
     }

Add outside this hunk:

impl EnhancedStateManager {
    async fn unregister_transaction(&self, tx_id: &str) {
        let mut transactions = self.active_transactions.write().await;
        transactions.remove(tx_id);
    }
}
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/enhanced.rs around lines 162 to 179, the code
registers a transaction but never unregisters it, causing active_transactions to
leak if processing succeeds or if trigger_checkpoint fails; add the suggested
impl EnhancedStateManager::unregister_transaction(&self, tx_id: &str) (placed
outside this hunk) and call self.unregister_transaction(&tx_ctx.id).await in all
exit paths of process_transactional_batch — i.e., ensure you remove the
transaction after successful completion and also in a catch/fallback when
trigger_checkpoint or any subsequent step returns an Err (use match or a
finally-like pattern such as storing the tx_id and calling unregister in both Ok
and Err branches or via a scope guard).

Comment on lines +134 to +154
async fn list_checkpoints(&self) -> Result<Vec<u64>, Error> {
let mut checkpoints = Vec::new();

// 列出检查点目录中的对象
let mut stream = self.client.list(Some(&self.checkpoint_base_path));

while let Some(object) = stream.try_next().await? {
// 从路径中提取检查点 ID
if let Some(name) = object.location.filename() {
if let Some(rest) = name.strip_prefix("chk-") {
if let Ok(id) = rest.parse::<u64>() {
checkpoints.push(id);
}
}
}
}

// 按降序排序(最新的在前)
checkpoints.sort_by(|a: &u64, b: &u64| b.cmp(a));
Ok(checkpoints)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: list_checkpoints never finds IDs (parsing filename instead of directory segment)

filename() returns file names like _metadata.json, not chk-<id>. Result: always empty list. Parse path segments and de-duplicate IDs.

-        // 列出检查点目录中的对象
-        let mut stream = self.client.list(Some(&self.checkpoint_base_path));
-
-        while let Some(object) = stream.try_next().await? {
-            // 从路径中提取检查点 ID
-            if let Some(name) = object.location.filename() {
-                if let Some(rest) = name.strip_prefix("chk-") {
-                    if let Ok(id) = rest.parse::<u64>() {
-                        checkpoints.push(id);
-                    }
-                }
-            }
-        }
+        // 列出检查点目录下所有对象并从路径片段解析 chk-<id>
+        let mut stream = self.client.list(Some(&self.checkpoint_base_path));
+        let mut seen = std::collections::HashSet::new();
+        while let Some(object) = stream
+            .try_next()
+            .await
+            .map_err(|e| Error::Process(format!("列举检查点失败: {}", e)))?
+        {
+            let path_str = object.location.to_string();
+            for seg in path_str.split('/') {
+                if let Some(rest) = seg.strip_prefix("chk-") {
+                    if let Ok(id) = rest.parse::<u64>() {
+                        if seen.insert(id) {
+                            checkpoints.push(id);
+                        }
+                    }
+                }
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn list_checkpoints(&self) -> Result<Vec<u64>, Error> {
let mut checkpoints = Vec::new();
// 列出检查点目录中的对象
let mut stream = self.client.list(Some(&self.checkpoint_base_path));
while let Some(object) = stream.try_next().await? {
// 从路径中提取检查点 ID
if let Some(name) = object.location.filename() {
if let Some(rest) = name.strip_prefix("chk-") {
if let Ok(id) = rest.parse::<u64>() {
checkpoints.push(id);
}
}
}
}
// 按降序排序(最新的在前)
checkpoints.sort_by(|a: &u64, b: &u64| b.cmp(a));
Ok(checkpoints)
}
async fn list_checkpoints(&self) -> Result<Vec<u64>, Error> {
let mut checkpoints = Vec::new();
// 列出检查点目录下所有对象并从路径片段解析 chk-<id>
let mut stream = self.client.list(Some(&self.checkpoint_base_path));
let mut seen = std::collections::HashSet::new();
while let Some(object) = stream
.try_next()
.await
.map_err(|e| Error::Process(format!("列举检查点失败: {}", e)))?
{
let path_str = object.location.to_string();
for seg in path_str.split('/') {
if let Some(rest) = seg.strip_prefix("chk-") {
if let Ok(id) = rest.parse::<u64>() {
if seen.insert(id) {
checkpoints.push(id);
}
}
}
}
}
// 按降序排序(最新的在前)
checkpoints.sort_by(|a: &u64, b: &u64| b.cmp(a));
Ok(checkpoints)
}
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/s3_backend.rs around lines 134 to 154,
list_checkpoints currently calls object.location.filename() which yields
filenames like "_metadata.json" so no "chk-<id>" is ever found; change the logic
to iterate over the path segments of object.location (splitting by '/'), for
each segment check for the "chk-" prefix, parse the trailing part as u64,
collect parsed ids into a HashSet to de-duplicate, then convert to a Vec, sort
in descending order and return; ensure you still propagate parsing/listing
errors as before.

Comment on lines +189 to +191
Ok(result) => {
let bytes = result.bytes().await?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Propagate object_store bytes() error with context

Current ? assumes a From<object_store::Error> for your Error. Be explicit for consistency.

-                let bytes = result.bytes().await?;
+                let bytes = result
+                    .bytes()
+                    .await
+                    .map_err(|e| Error::Process(format!("读取 S3 对象失败: {}", e)))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Ok(result) => {
let bytes = result.bytes().await?;
Ok(result) => {
let bytes = result
.bytes()
.await
.map_err(|e| Error::Process(format!("读取 S3 对象失败: {}", e)))?;
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/s3_backend.rs around lines 189–191, the call
`result.bytes().await?` currently uses `?` which assumes a
From<object_store::Error> impl; change it to explicitly map the object_store
error into your crate's Error with context (e.g.,
`result.bytes().await.map_err(|e| /* convert to your Error with context, e.g.
Error::ObjectStore{ source: e, context: "..."} */ )?`) so the
object_store::Error is propagated with clear, explicit conversion and helpful
context for debugging.

Comment on lines +97 to +100
// Register mock components
let _ = arkflow_core::input::register_input_builder("mock", Arc::new(MockInputBuilder));
let _ = arkflow_core::output::register_output_builder("mock", Arc::new(MockOutputBuilder));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don’t ignore registration errors; only tolerate “already registered”.

Swallowing all errors can hide real failures.

-    // Register mock components
-    let _ = arkflow_core::input::register_input_builder("mock", Arc::new(MockInputBuilder));
-    let _ = arkflow_core::output::register_output_builder("mock", Arc::new(MockOutputBuilder));
+    // Register mock components (idempotent)
+    register_mock_components();

Add once per file:

fn register_mock_components() {
    if let Err(e) = arkflow_core::input::register_input_builder("mock", Arc::new(MockInputBuilder)) {
        if !format!("{e}").contains("already registered") {
            panic!("register_input_builder failed: {e}");
        }
    }
    if let Err(e) = arkflow_core::output::register_output_builder("mock", Arc::new(MockOutputBuilder)) {
        if !format!("{e}").contains("already registered") {
            panic!("register_output_builder failed: {e}");
        }
    }
}
🤖 Prompt for AI Agents
In crates/arkflow-core/tests/comprehensive_state_test.rs around lines 97 to 100,
replace the current blind ignores of
register_input_builder/register_output_builder with a small helper function
(defined once in the file) named register_mock_components that calls both
registration functions, checks their Result, and panics on any Err unless the
error string contains "already registered"; then call register_mock_components()
where the registrations currently occur so real registration failures are not
swallowed.

Comment on lines +31 to +38
fs::create_dir_all(temp_dir).unwrap();
let input_path = format!("{}/test_input.txt", temp_dir);

fs::write(
&input_path,
"Hello World\nThis is a test\nState management test\n",
)
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing std::fs import causes compile error.

fs::create_dir_all, fs::write, and fs::remove_dir_all require use std::fs;.

 use arkflow_core::stream::StreamConfig;
-use tokio::time::{sleep, Duration};
+use tokio::time::{sleep, Duration};
+use std::fs;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fs::create_dir_all(temp_dir).unwrap();
let input_path = format!("{}/test_input.txt", temp_dir);
fs::write(
&input_path,
"Hello World\nThis is a test\nState management test\n",
)
.unwrap();
// At the top of crates/arkflow-core/tests/state_management_integration.rs
use arkflow_core::stream::StreamConfig;
use tokio::time::{sleep, Duration};
use std::fs;
🤖 Prompt for AI Agents
In crates/arkflow-core/tests/state_management_integration.rs around lines 31 to
38, the test uses fs::create_dir_all, fs::write, and fs::remove_dir_all but
never imports std::fs; add a top-of-file import "use std::fs;" (or prefix calls
with std::fs::) so the filesystem functions resolve and the test compiles.

Comment on lines +39 to +44
input: InputConfig {
r#type: "file".to_string(),
path: Some("./examples/data/test.txt".to_string()),
format: Some("text".to_string()),
..Default::default()
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Potential API mismatch: InputConfig fields likely invalid

This example uses r#type, path, and format, whereas other examples use input_type and config (serde_json). This may not compile if InputConfig doesn’t expose these fields.

Run to verify current InputConfig shape and update the example accordingly:


🏁 Script executed:

#!/bin/bash
rg -nP --type=rust -C3 'struct\s+InputConfig\b'
rg -nP --type=rust -C2 '\bInputConfig\s*\{'
rg -nP --type=rust -C2 '\binput_type\b|\br#?type\b|\bpath\b|\bformat\b|\bconfig\b'

Length of output: 263655


Match InputConfig fields to core definition
Replace the use of r#type, path, and format with input_type, name, and config per arkflow-core’s InputConfig (in crates/arkflow-core/src/input/mod.rs), for example in examples/test_state_management.rs:39–44:

- input: InputConfig {
-     r#type: "file".to_string(),
-     path: Some("./examples/data/test.txt".to_string()),
-     format: Some("text".to_string()),
-     ..Default::default()
- },
+ input: InputConfig {
+     input_type: "file".to_string(),
+     name: None,
+     config: Some(serde_json::json!({
+         "path": "./examples/data/test.txt",
+         "format": "text"
+     })),
+     ..Default::default()
+ },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
input: InputConfig {
r#type: "file".to_string(),
path: Some("./examples/data/test.txt".to_string()),
format: Some("text".to_string()),
..Default::default()
},
input: InputConfig {
input_type: "file".to_string(),
name: None,
config: Some(serde_json::json!({
"path": "./examples/data/test.txt",
"format": "text"
})),
..Default::default()
},
🤖 Prompt for AI Agents
In examples/test_state_management.rs around lines 39 to 44, the InputConfig
struct uses outdated field names (r#type, path, format); update to match
arkflow-core's InputConfig by replacing r#type with input_type, path with name,
and format moved into the config field (e.g., set config to a map/JSON
containing format=>"text" or equivalent), and keep the rest of the defaults via
..Default::default().

Comment on lines +157 to +159
for handle in handles {
handle.await?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

JoinHandle error not convertible to arkflow_core::Error

handle.await? requires From<JoinError> for Error; likely missing. Map join errors explicitly.

-    for handle in handles {
-        handle.await?;
-    }
+    for handle in handles {
+        if let Err(e) = handle.await {
+            return Err(Error::Process(format!("流任务 Join 失败: {}", e)));
+        }
+    }
🤖 Prompt for AI Agents
In examples/test_state_management.rs around lines 157 to 159, awaiting the
JoinHandle uses the `?` operator but `tokio::task::JoinError` does not
automatically convert into `arkflow_core::Error`; change to explicitly map the
join error into an arkflow_core::Error (for example using `.await.map_err(|e|
arkflow_core::Error::... )?`) or match the result and return/convert a
descriptive arkflow_core::Error variant so the compiler has a concrete
conversion path.

Comment on lines +45 to +49
output: OutputConfig {
output_type: "stdout".to_string(),
name: None,
config: None,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

OutputConfig field name likely mismatched (output_type vs r#type).

For consistency with the other example and likely with the current type, prefer r#type and default-initialize the rest.

Apply:

             output: OutputConfig {
-                output_type: "stdout".to_string(),
-                name: None,
-                config: None,
+                r#type: "stdout".to_string(),
+                ..Default::default()
             },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
output: OutputConfig {
output_type: "stdout".to_string(),
name: None,
config: None,
},
output: OutputConfig {
r#type: "stdout".to_string(),
..Default::default()
},
🤖 Prompt for AI Agents
In examples/verify_state_integration.rs around lines 45 to 49 the OutputConfig
struct uses output_type which likely mismatches the struct field r#type used
elsewhere; change the field name to r#type and set the other fields to default
(e.g., name: None, config: None) or use Default::default() for the whole
OutputConfig as appropriate so the field names align with the type definition
and other examples.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants