-
Couldn't load subscription status.
- Fork 39
feat: new #720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds a comprehensive state-management subsystem to arkflow-core: per-batch metadata and transaction context, in-memory and S3-backed state stores with checkpoint coordination (2PC), performance optimizations, Prometheus monitoring, EngineBuilder/stream integration, examples, tests, and Cargo dependency updates. Several new public types/functions are introduced; no breaking removals. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Source
participant Proc as Processor
participant EO as ExactlyOnceProcessor
participant ESM as EnhancedStateManager
participant TC as TransactionCoordinator
participant Out as Output (2PC)
Note over Source,Proc: Incoming MessageBatch
Source->>Proc: send(batch)
Proc->>EO: process(batch)
EO->>ESM: read/write operator state
alt batch contains TransactionContext
EO->>TC: register / prepare checkpoint
TC-->>EO: transaction context
end
EO-->>Proc: processed batches
Proc->>Out: write(batch)
Out->>ESM: prepare (log state) / commit via 2PC
Out-->>Proc: ack
sequenceDiagram
autonumber
participant ESM as EnhancedStateManager
participant PB as OptimizedS3Backend / S3StateBackend
participant OS as ObjectStore (S3)
participant Mon as StateMonitor
ESM->>Mon: record checkpoint start
ESM->>PB: persist operator states (batched/compressed maybe)
PB->>OS: PUT objects (checkpoint, states, metadata)
OS-->>PB: OK
PB-->>ESM: persistence ACK
ESM->>Mon: record checkpoint success / metrics
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45–75 minutes Possibly related PRs
Suggested labels
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 28
🧹 Nitpick comments (52)
README-STATE-MANAGEMENT.md (4)
46-53: Avoid overpromising “exactly-once” until Phase 2 is complete or clearly marked experimentalThis section states hard guarantees, but docs/state-management-implementation-summary.md says Phase 2 (S3 + exactly-once) is only “partially complete.” Rephrase as “experimental/preview” or add a status note to avoid inconsistency across docs.
135-139: Pinning dependency version may be inaccurate for this repo; prefer workspace or verify crate versionIf this README targets users of the published crate, verify the current version. If it targets contributors, prefer workspace path to avoid version drift.
-[dependencies] -arkflow-core = "0.4" +[dependencies] +# If using this repository directly: +# arkflow-core = { path = "crates/arkflow-core" } +# If using from crates.io, replace with the current released version: +# arkflow-core = "X.Y"
158-160:MessageBatch::from_stringmay not exist; align withnew_binaryused elsewhereExamples in this PR use
MessageBatch::new_binary. Consider updating for consistency.- let batch = MessageBatch::from_string("hello world")?; + let batch = MessageBatch::new_binary(vec![b"hello world".to_vec()])?;
193-197: Benchmark numbers need contextAdd hardware, dataset, and methodology or mark as indicative to avoid misleading expectations.
CLAUDE.md (2)
81-99: Quote broker endpoints in YAMLUnquoted
host:portcan be parsed unexpectedly in YAML. Quote the strings.- brokers: [localhost:9092] + brokers: ["localhost:9092"] ... - topics: [test-topic] + topics: ["test-topic"]
108-113: Clarify that backpressure threshold is configurableDocs state a fixed threshold (1024). If configurable, note the config key or remove the hardcoded value.
docs/state-management.md (4)
27-29: Use the canonical batch constructorAlign with other examples using binary batches.
- let batch = MessageBatch::from_string("hello world")?; + let batch = MessageBatch::new_binary(vec![b"hello world".to_vec()])?;
44-50: TransactionalOutput example lacks commit/flush semanticsIf your
TransactionalOutputWrapperexposes explicitprepare/commit/abortor implicit commit on checkpoint, mention it to avoid misuse.
87-102: Add missing imports in snippet for completenessThe snippet uses
ArcandHashMapwithout imports.+use std::collections::HashMap; +use std::sync::Arc;
165-175: Consistency: avoid importing unusedStatefulExampleProcessorin WordCount exampleThe WordCount snippet defines its own processor; drop unrelated imports to reduce confusion.
docs/state-management-implementation-summary.md (2)
21-38: Doc-set consistency: mark “exactly-once” feature status uniformlyHere it’s “部分完成 ✅”, while README claims full “Exactly-Once Semantics.” Align wording across docs (e.g., “experimental/preview”) and briefly list known limitations.
156-156: Minor wording polishAvoid mixing English “solid” in Chinese text. Suggest “扎实”.
-当前实现已经提供了一个solid的基础 +当前实现已经提供了一个扎实的基础examples/stateful_pipeline.rs (5)
150-155: Avoid potential panic on system clock and reduce cast risk
duration_sincecan panic on clock skew. Also, document the u128→u64 cast.- .unwrap_or_else(|| { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64 - }); + .unwrap_or_else(|| { + use std::time::{SystemTime, UNIX_EPOCH, Duration}; + let dur = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_millis(0)); + dur.as_millis().saturating_cast::<u64>() + });If
saturating_castisn’t available, uselet ms = dur.as_millis(); (ms.min(u128::from(u64::MAX))) as u64.
329-329: Suppress unused variable or wire the stream
streamis created but not used. Either run it or rename to_streamto avoid warnings.- let (mut stream, state_manager) = build_stateful_pipeline(config).await?; + let (_stream, state_manager) = build_stateful_pipeline(config).await?;
167-193: Reduce lock contention: consolidate state writes per batchYou take a write lock and perform
get/setper message. Consider batching updates in a local map and persisting once per window/key to reduce contention and I/O.
195-201: One output per input increases overheadEmitting a result batch per message can be costly. Consider emitting per window or at checkpoint boundaries for throughput.
6-13: Tidy imports
OperationTimer,TwoPhaseCommitOutput, and possiblyInput/Output/Uuidare unused in this example. Consider removing for clarity.crates/arkflow-core/Cargo.toml (2)
31-38: Gate heavy deps behind optional features to reduce compile time and transitive bloatMake
object_store,prometheus, andasync-compressionoptional, and disable unnecessary default features. This keeps core lightweight for users not using S3/metrics/compression.-uuid = { version = "1.10", features = ["v4"] } -object_store = { version = "0.12", features = ["aws"] } +uuid = { version = "1.10", default-features = false, features = ["v4", "fast-rng"] } +object_store = { version = "0.12", features = ["aws"], optional = true } -futures-util = { workspace = true } -async-compression = { version = "0.4", features = ["tokio", "zstd"] } +futures-util = { workspace = true, default-features = false, features = ["alloc"] } +async-compression = { version = "0.4", default-features = false, features = ["tokio", "zstd"], optional = true } lru = "0.12" bytes = "1.7" -prometheus = "0.13" +prometheus = { version = "0.13", default-features = false, optional = true } -parking_lot = "0.12" +parking_lot = { version = "0.12", default-features = false }Add features section (outside this hunk):
[features] default = [] s3-backend = ["dep:object_store", "dep:async-compression"] metrics = ["dep:prometheus"]
30-30: Avoid exact pin on num_cpus unless neededExact pin can cause unnecessary resolver conflicts. Prefer caret or workspace-managed.
-num_cpus = "1.17.0" +num_cpus = "1.17"crates/arkflow-core/src/state/integration_tests.rs (3)
26-34: Lower checkpoint interval to speed CI without changing semantics1s stalls tests under load; 10–50ms is plenty for unit paths.
- checkpoint_interval_ms: 1000, + checkpoint_interval_ms: 25,
39-47: Remove redundant reference on string literalPassing
&"counter"produces an extra reference; pass&strdirectly.- .set_state_value("test_op", &"counter", 42u64) + .set_state_value("test_op", "counter", 42u64)
49-57: Assert checkpoint ID monotonicity for stronger signalAlso validate that IDs are positive and increasing across two invocations.
- let checkpoint_id = state_manager.create_checkpoint().await.unwrap(); - assert!(checkpoint_id > 0); + let c1 = state_manager.create_checkpoint().await.unwrap(); + let c2 = state_manager.create_checkpoint().await.unwrap(); + assert!(c1 > 0 && c2 > c1);crates/arkflow-core/src/state/tests.rs (2)
36-39: Strengthen metadata round-trip assertionAssert content equality, not only presence. If PartialEq is impl’d, compare; otherwise, check expected fields.
- let extracted_metadata = batch_with_metadata.metadata(); - assert!(extracted_metadata.is_some()); + let extracted = batch_with_metadata.metadata().expect("metadata missing"); + // Example if PartialEq: + // assert_eq!(extracted, metadata); + // Or minimally ensure defaults: + assert!(extracted.transaction.is_none());
65-88: Metrics assertions are fine; avoid brittle substring if possibleIf you export a registry, consider asserting presence of a known metric name rather than generic "arkflow_state".
crates/arkflow-core/src/lib.rs (2)
79-81: Consider feature-gating the newObjectStoreerror variantIf S3/object store is optional, gate this variant to avoid forcing the dep for users not using state backends.
- #[error("Object store error: {0}")] - ObjectStore(String), + #[cfg(feature = "s3-backend")] + #[error("Object store error: {0}")] + ObjectStore(String),
161-171: Gate external error conversions and keep mapping consistentAvoid unconditional deps on
object_store/prometheus; map Prometheus errors to a dedicated variant or keep as Process but gate it.-impl From<object_store::Error> for Error { +#[cfg(feature = "s3-backend")] +impl From<object_store::Error> for Error { fn from(err: object_store::Error) -> Self { Error::ObjectStore(format!("{}", err)) } } -impl From<prometheus::Error> for Error { +#[cfg(feature = "metrics")] +impl From<prometheus::Error> for Error { fn from(err: prometheus::Error) -> Self { Error::Process(format!("Prometheus error: {}", err)) } }docs/state-management-guide.md (4)
194-203: Align logging guidance with crate’s tracing stackProject uses
tracing/tracing-subscriber; prefer that overenv_loggerto avoid mixed patterns.- env_logger::Builder::from_default_env() - .filter_level(log::LevelFilter::Debug) - .init(); + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .with_target(false) + .init();
37-50: Doc snippet stores a custom struct; note the Serialize/Deserialize boundIf
SessionInfolacks serde derives, example won’t compile. Add a brief note.
13-24: Minor style nits in bulleted listsLanguageTool flags are likely due to inconsistent punctuation after bold terms. Consider adding colons/periods consistently.
136-148: Align S3 configuration documentation with the actualS3StateBackendConfigThe example YAML in docs/state-management-guide.md (lines 136–148) currently shows only four S3 settings (bucket, region, prefix, use_ssl), but the Rust struct includes three additional optional fields. Update the docs so that users see every configurable key and its default behavior to avoid runtime errors.
Locations to update:
- docs/state-management-guide.md (around lines 136–148)
Missing fields to document:
endpoint(Option)access_key_id(Option)secret_access_key(Option)Suggested diff for the YAML snippet:
state: enabled: true backend_type: S3 checkpoint_interval_ms: 60000 retained_checkpoints: 10 exactly_once: true s3_config: bucket: "my-app-state" region: "us-east-1" prefix: "prod/checkpoints" + endpoint: "https://s3.custom-endpoint.com" # optional + access_key_id: "YOUR_ACCESS_KEY" # optional + secret_access_key: "YOUR_SECRET_KEY" # optional use_ssl: truecrates/arkflow-core/src/state/helper.rs (1)
98-101: Prefer iterator return to avoid lifetime coupling from Vec<&str>.Returning Vec<&str> ties callers to Self’s lifetime. Expose an iterator instead (zero alloc) or return owned keys if you prefer simplicity.
- pub fn keys(&self) -> Vec<&str> { - self.data.keys().map(|s| s.as_str()).collect() - } + pub fn keys(&self) -> impl Iterator<Item = &str> { + self.data.keys().map(|s| s.as_str()) + }examples/session_window.rs (3)
171-193: Keep active session count consistent with the tracked list.It’s never incremented on new sessions and only decremented on cleanup. Set it from the list length after updates.
if !active_sessions.contains(&event.session_id) { active_sessions.push(event.session_id.clone()); state_manager .set_state_value(&self.operator_id, &"active_session_list", active_sessions) .await?; + // Keep a simple derived counter in sync + let list: Vec<String> = state_manager + .get_state_value(&self.operator_id, "active_session_list") + .await? + .unwrap_or_default(); + state_manager + .set_state_value(&self.operator_id, "active_sessions", list.len()) + .await?; }And in cleanup, prefer deriving from the list instead of subtracting:
- let count = state_manager - .get_state_value::<usize>(&self.operator_id, &"active_sessions") - .await? - .unwrap_or(0) - .saturating_sub(expired.len()); - - state_manager - .set_state_value(&self.operator_id, &"active_sessions", count) - .await?; + let list: Vec<String> = state_manager + .get_state_value(&self.operator_id, "active_session_list") + .await? + .unwrap_or_default(); + state_manager + .set_state_value(&self.operator_id, "active_sessions", list.len()) + .await?;Also applies to: 243-253
201-206: Remove unused variable.current_time is computed but not used.
- let current_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as u64;
9-14: Clean up unused imports.VecDeque and Instant aren’t used.
-use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; @@ -use tokio::time::{sleep, Instant}; +use tokio::time::sleep;examples/word_count.rs (1)
31-41: Make counts map type explicit to u64 for clarity.Avoid defaulting to i32.
- let mut word_counts = HashMap::new(); + let mut word_counts: HashMap<String, u64> = HashMap::new();crates/arkflow-core/src/state/mod.rs (1)
70-77: Deduplicate magic string key for metadata.Use a constant to avoid typos across extract/embed.
+const METADATA_FIELD: &str = "__arkflow_metadata__"; @@ - .get("__arkflow_metadata__") + .get(METADATA_FIELD) @@ - metadata.insert("__arkflow_metadata__".to_string(), metadata_json); + metadata.insert(METADATA_FIELD.to_string(), metadata_json);Also applies to: 84-89
crates/arkflow-core/src/state/example.rs (4)
51-61: Count even when input_name is absent.Default to a placeholder to make the example produce a visible count.
- if let Some(input_name) = batch.get_input_name() { - let key = format!("count_{}", input_name); + { + let input_name = batch.get_input_name().unwrap_or("unknown"); + let key = format!("count_{}", input_name);
74-82: Unused parameter and unconstrained generics in example.Prefix key_column with underscore to silence warnings. Consider removing K,V generics since they’re not used.
- key_column: &str, + _key_column: &str,
170-187: Avoid #[tokio::main] inside library code.Define example_usage as async fn and call it from a proper example/bin or a test.
-#[tokio::main] -async fn example_usage() -> Result<(), Error> { +pub async fn example_usage() -> Result<(), Error> {
129-138: Consolidate StateBackendType into a single definitionThe
StateBackendTypeenum is currently defined twice undercrates/arkflow-core/src/state:
enhanced.rs(around line 93) – this should be the canonical, crate-wide definitionexample.rs(lines 129–138) – a duplicate that risks drifting out of syncTo avoid maintenance headaches and ensure a single source of truth, remove the local enum in
example.rsand import the shared definition instead.• File:
crates/arkflow-core/src/state/example.rs
– Remove lines 129–138 (the duplicatepub enum StateBackendType { … })
– Add at the top of the file:use crate::state::StateBackendType;Example diff:
--- a/crates/arkflow-core/src/state/example.rs +++ b/crates/arkflow-core/src/state/example.rs @@ -1,6 +1,4 @@ -/// State backend types -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum StateBackendType { - Memory, - FileSystem, - S3, -} +use crate::state::StateBackendType;crates/arkflow-core/src/state/simple.rs (1)
144-148: Consider using Mutex instead of RwLock for write-heavy operationsThe
last_injectionfield is only ever written to (viawrite().await) and read once. Since there's no concurrent reading pattern that would benefit fromRwLock, usingMutexwould be more efficient.pub struct SimpleBarrierInjector { interval: std::time::Duration, - last_injection: Arc<tokio::sync::RwLock<std::time::Instant>>, + last_injection: Arc<tokio::sync::Mutex<std::time::Instant>>, next_checkpoint_id: Arc<AtomicU64>, }crates/arkflow-core/src/state/performance.rs (2)
169-172: Unnecessary complexity with NonZeroUsize fallbackThe
unwrap_orpattern here is redundant since you're already providing a fallback value. The configuration should ensure valid values upfront.-let local_cache = Arc::new(RwLock::new(LruCache::new( - NonZeroUsize::new(perf_config.local_cache_size) - .unwrap_or(NonZeroUsize::new(1000).unwrap()), -))); +let cache_size = NonZeroUsize::new(perf_config.local_cache_size) + .unwrap_or_else(|| NonZeroUsize::new(1000).unwrap()); +let local_cache = Arc::new(RwLock::new(LruCache::new(cache_size)));
413-414: TODO comments indicate incomplete implementationThe performance statistics tracking for cache hits and misses is not implemented, which could make it difficult to monitor cache effectiveness.
The TODO comments indicate that cache hit/miss tracking is missing. Would you like me to generate an implementation that properly tracks these metrics or create an issue to track this task?
crates/arkflow-core/src/state/s3_backend.rs (1)
129-149: Inefficient checkpoint ID extraction from filenamesThe current implementation parses checkpoint IDs from filenames using string manipulation and sorting. This could be optimized and made more robust.
Consider using a more efficient approach with better error handling:
async fn list_checkpoints(&self) -> Result<Vec<u64>, Error> { - let mut checkpoints = Vec::new(); + let mut checkpoints = Vec::with_capacity(100); // Pre-allocate for typical case // List objects in checkpoint directory let mut stream = self.client.list(Some(&self.checkpoint_base_path)); while let Some(object) = stream.try_next().await? { // Extract checkpoint ID from path if let Some(name) = object.location.filename() { - if let Some(rest) = name.strip_prefix("chk-") { - if let Ok(id) = rest.parse::<u64>() { - checkpoints.push(id); - } - } + if let Some(id_str) = name.strip_prefix("chk-") { + // More robust parsing that handles the zero-padding + match id_str.trim_start_matches('0').parse::<u64>() { + Ok(id) if id > 0 => checkpoints.push(id), + Ok(0) if id_str == "0" || id_str.chars().all(|c| c == '0') => checkpoints.push(0), + _ => {} // Skip invalid entries + } + } } }crates/arkflow-core/src/state/enhanced.rs (3)
344-355: Redundant state manager locking in processing loopThe state manager is locked twice in quick succession (lines 338 and 351), which could cause unnecessary contention.
Consider keeping the lock for the entire operation:
pub async fn process(&self, batch: MessageBatch) -> Result<Vec<MessageBatch>, Error> where P: crate::processor::Processor, { // Let state manager handle barriers and transactions let mut state_manager = self.state_manager.write().await; let processed_batches = state_manager.process_batch(batch).await?; // Apply actual processing let mut results = Vec::new(); for batch in processed_batches { // Process with inner processor let inner_results = self.inner.process(batch.clone()).await?; // Update state if needed if let Some(tx_ctx) = batch.transaction_context() { // Example: Update processed count let state_key = format!("processed_count_{}", tx_ctx.checkpoint_id); - let mut state_manager = self.state_manager.write().await; state_manager .set_state_value(&self.operator_id, &state_key, batch.len()) .await?; } results.extend(inner_results); } Ok(results) }
391-392: Unused field in TwoPhaseCommitOutputThe
pending_transactionsfield is declared but never used in any of the methods.Either implement pending transaction tracking or remove the unused field:
pub struct TwoPhaseCommitOutput<O> { inner: O, state_manager: Arc<tokio::sync::RwLock<EnhancedStateManager>>, transaction_log: Arc<RwLock<Vec<TransactionLogEntry>>>, - pending_transactions: HashMap<String, Vec<MessageBatch>>, }
183-209: Incomplete checkpoint coordinator implementationThe comment block (lines 188-192) indicates this is a simplified implementation. The actual multi-operator coordination logic is missing.
The implementation currently only saves local states and doesn't handle multi-operator coordination as indicated by the comments. Would you like me to help implement proper checkpoint coordination with participant registration and two-phase commit protocol?
crates/arkflow-core/src/state/monitoring.rs (5)
13-13: Import Encoder trait if switching to encode().Apply this diff:
-use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts, Registry, TextEncoder}; +use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts, Registry, TextEncoder, Encoder};
455-464: Compute error_rate instead of placeholder.Provides actionable health output with existing counters.
Apply this diff:
pub fn health_status(&self) -> HealthStatus { - // This would typically query current metrics and calculate health - HealthStatus { - healthy: true, // Simplified for now - state_size: self.metrics.state_size_bytes.get() as u64, - active_transactions: self.metrics.active_transactions.get() as usize, - last_checkpoint: None, // Would need to track this - error_rate: 0.0, // Would need to calculate from counters - } + let total = self.metrics.operations_total.get(); + let failed = self.metrics.operations_failed_total.get(); + let error_rate = if total > 0.0 { failed / total } else { 0.0 }; + HealthStatus { + healthy: error_rate <= self.alert_config.error_rate_threshold + && (self.metrics.state_size_bytes.get() as u64) <= self.alert_config.state_size_threshold, + state_size: self.metrics.state_size_bytes.get() as u64, + active_transactions: self.metrics.active_transactions.get() as usize, + last_checkpoint: None, + error_rate, + }
444-452: Cache metrics API is split (record_cache_hit/miss vs update_cache_metrics). Consider consolidating.To avoid double-counting across paths, expose one method (incremental or absolute) and document semantics.
Would you like a small wrapper that accepts an enum {Hit, Miss} and size Option to unify updates?
478-483: backend_type is written but unused.Either use for conditional metrics or remove to avoid dead code warnings.
Apply this diff to silence temporarily if you plan to use it soon:
pub struct MonitoredStateManager { inner: EnhancedStateManager, monitor: Arc<StateMonitor>, - backend_type: StateBackendType, + #[allow(dead_code)] + backend_type: StateBackendType, }
389-396: Export path: consider Registry exposure vs. one global registry.If multiple monitors exist, you may prefer sharing a global registry to avoid duplicate metric names in different registries.
If helpful, I can draft
StateMonitor::with_registry(registry: Registry, alert_config: AlertConfig)and makeStateMetrics::new_with_registrypublic to the crate.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (21)
CLAUDE.md(1 hunks)README-STATE-MANAGEMENT.md(1 hunks)crates/arkflow-core/Cargo.toml(1 hunks)crates/arkflow-core/src/lib.rs(4 hunks)crates/arkflow-core/src/state/enhanced.rs(1 hunks)crates/arkflow-core/src/state/example.rs(1 hunks)crates/arkflow-core/src/state/helper.rs(1 hunks)crates/arkflow-core/src/state/integration_tests.rs(1 hunks)crates/arkflow-core/src/state/mod.rs(1 hunks)crates/arkflow-core/src/state/monitoring.rs(1 hunks)crates/arkflow-core/src/state/performance.rs(1 hunks)crates/arkflow-core/src/state/s3_backend.rs(1 hunks)crates/arkflow-core/src/state/simple.rs(1 hunks)crates/arkflow-core/src/state/tests.rs(1 hunks)crates/arkflow-core/src/state/transaction.rs(1 hunks)docs/state-management-guide.md(1 hunks)docs/state-management-implementation-summary.md(1 hunks)docs/state-management.md(1 hunks)examples/session_window.rs(1 hunks)examples/stateful_pipeline.rs(1 hunks)examples/word_count.rs(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (15)
crates/arkflow-core/src/lib.rs (2)
crates/arkflow-plugin/src/input/file.rs (1)
object_store(246-254)crates/arkflow-core/src/state/mod.rs (2)
from(101-106)extract_from_batch(71-78)
crates/arkflow-core/src/state/simple.rs (3)
crates/arkflow-core/src/state/enhanced.rs (7)
new(101-128)new(320-330)new(396-403)process(333-361)write(406-427)get_transaction_log(473-475)default(78-88)crates/arkflow-core/src/lib.rs (6)
from(162-164)from(168-170)from(258-263)from(267-269)metadata(332-334)from_string(213-215)crates/arkflow-core/src/state/transaction.rs (1)
aligned_checkpoint(68-70)
crates/arkflow-core/src/state/transaction.rs (3)
crates/arkflow-core/src/state/enhanced.rs (3)
new(101-128)new(320-330)new(396-403)crates/arkflow-core/src/state/mod.rs (2)
new(63-68)extract_from_batch(71-78)crates/arkflow-core/src/lib.rs (1)
metadata(332-334)
crates/arkflow-core/src/state/mod.rs (1)
crates/arkflow-core/src/lib.rs (6)
metadata(332-334)new_arrow(205-210)from(162-164)from(168-170)from(258-263)from(267-269)
examples/stateful_pipeline.rs (2)
crates/arkflow-core/src/state/enhanced.rs (4)
new(101-128)new(320-330)new(396-403)process(333-361)crates/arkflow-core/src/state/monitoring.rs (4)
new(57-61)new(358-366)new(486-498)monitor(506-508)
crates/arkflow-core/src/state/helper.rs (3)
crates/arkflow-core/src/state/performance.rs (5)
get_typed(502-507)put_typed(509-514)new(119-124)new(157-182)new(447-454)crates/arkflow-core/src/state/s3_backend.rs (5)
get_typed(340-345)put_typed(347-352)new(70-107)new(293-301)new(373-379)crates/arkflow-core/src/state/example.rs (2)
new(34-39)new(150-157)
examples/word_count.rs (4)
crates/arkflow-core/src/state/enhanced.rs (5)
process(333-361)new(101-128)new(320-330)new(396-403)default(78-88)crates/arkflow-core/src/state/simple.rs (2)
process(44-74)default(200-206)examples/session_window.rs (7)
process(200-271)state_manager(127-128)state_manager(132-133)state_manager(184-185)state_manager(216-217)None(166-166)main(328-420)crates/arkflow-core/src/lib.rs (2)
new_binary(99-101)from_string(213-215)
crates/arkflow-core/src/state/monitoring.rs (1)
crates/arkflow-core/src/state/enhanced.rs (3)
new(101-128)new(320-330)new(396-403)
examples/session_window.rs (3)
crates/arkflow-core/src/state/enhanced.rs (5)
new(101-128)new(320-330)new(396-403)process(333-361)default(78-88)examples/stateful_pipeline.rs (3)
state_manager(123-124)state_manager(172-173)process(136-204)crates/arkflow-core/src/lib.rs (2)
new_binary(99-101)len(221-223)
crates/arkflow-core/src/state/s3_backend.rs (2)
crates/arkflow-core/src/state/enhanced.rs (3)
new(101-128)new(320-330)new(396-403)crates/arkflow-core/src/state/helper.rs (5)
new(41-45)get_typed(23-25)get_typed(59-70)put_typed(28-30)put_typed(72-79)
crates/arkflow-core/src/state/performance.rs (3)
crates/arkflow-core/src/state/helper.rs (5)
new(41-45)get_typed(23-25)get_typed(59-70)put_typed(28-30)put_typed(72-79)crates/arkflow-core/src/state/s3_backend.rs (7)
new(70-107)new(293-301)new(373-379)local_state(304-306)local_state_mut(309-311)get_typed(340-345)put_typed(347-352)crates/arkflow-core/src/lib.rs (4)
from(162-164)from(168-170)from(258-263)from(267-269)
crates/arkflow-core/src/state/example.rs (3)
crates/arkflow-core/src/state/helper.rs (2)
new(41-45)default(110-112)crates/arkflow-core/src/state/simple.rs (5)
new(36-41)new(97-102)get_count(77-81)default(200-206)example_usage(210-244)crates/arkflow-core/src/lib.rs (1)
from_string(213-215)
crates/arkflow-core/src/state/tests.rs (4)
crates/arkflow-core/src/lib.rs (3)
from_string(213-215)metadata(332-334)is_empty(217-219)crates/arkflow-core/src/state/helper.rs (1)
new(41-45)crates/arkflow-core/src/state/mod.rs (1)
new(63-68)crates/arkflow-core/src/state/monitoring.rs (4)
new(57-61)new(358-366)new(486-498)monitor(506-508)
crates/arkflow-core/src/state/enhanced.rs (4)
crates/arkflow-core/src/state/helper.rs (1)
new(41-45)crates/arkflow-core/src/state/monitoring.rs (4)
new(57-61)new(358-366)new(486-498)inner(501-503)crates/arkflow-core/src/state/s3_backend.rs (3)
new(70-107)new(293-301)new(373-379)crates/arkflow-core/src/state/simple.rs (5)
new(36-41)new(97-102)new(152-158)process(44-74)write(105-135)
crates/arkflow-core/src/state/integration_tests.rs (2)
crates/arkflow-core/src/state/enhanced.rs (3)
new(101-128)new(320-330)new(396-403)crates/arkflow-core/src/state/helper.rs (1)
new(41-45)
🪛 LanguageTool
docs/state-management.md
[grammar] ~7-~7: There might be a mistake here.
Context: ...# Overview The implementation provides: 1. Metadata support in MessageBatch - Att...
(QB_NEW_EN)
[grammar] ~154-~154: There might be a mistake here.
Context: ...ntation** - Add persistent state storage 2. Exactly-Once Guarantees - Full two-pha...
(QB_NEW_EN)
[grammar] ~155-~155: There might be a mistake here.
Context: ...* - Full two-phase commit implementation 3. State Partitioning - Scale state with ...
(QB_NEW_EN)
[grammar] ~156-~156: There might be a mistake here.
Context: ...ng** - Scale state with key partitioning 4. State TTL - Automatic cleanup of old s...
(QB_NEW_EN)
[grammar] ~157-~157: There might be a mistake here.
Context: ...e TTL** - Automatic cleanup of old state 5. Monitoring - Metrics for state size an...
(QB_NEW_EN)
CLAUDE.md
[grammar] ~49-~49: There might be a mistake here.
Context: ...w-core/): Core stream processing engine - lib.rs`: Main types (MessageBatch, Error, Resou...
(QB_NEW_EN)
[grammar] ~50-~50: There might be a mistake here.
Context: ...in types (MessageBatch, Error, Resource) - stream/mod.rs: Stream orchestration with input/pipeli...
(QB_NEW_EN)
[grammar] ~51-~51: There might be a mistake here.
Context: ...orchestration with input/pipeline/output - config.rs: Configuration management (YAML/JSON/TO...
(QB_NEW_EN)
[grammar] ~52-~52: There might be a mistake here.
Context: ...onfiguration management (YAML/JSON/TOML) - input/, output/, processor/, buffer/: Co...
(QB_NEW_EN)
[grammar] ~55-~55: There might be a mistake here.
Context: ...rkflow-plugin/): Plugin implementations - input/`: Kafka, MQTT, HTTP, file, database, etc...
(QB_NEW_EN)
[grammar] ~56-~56: There might be a mistake here.
Context: ... Kafka, MQTT, HTTP, file, database, etc. - output/: Kafka, MQTT, HTTP, stdout, etc. - `...
(QB_NEW_EN)
[grammar] ~57-~57: There might be a mistake here.
Context: ...utput/: Kafka, MQTT, HTTP, stdout, etc. - processor/`: SQL, JSON, Protobuf, Python, VRL, etc....
(QB_NEW_EN)
[grammar] ~58-~58: There might be a mistake here.
Context: ...: SQL, JSON, Protobuf, Python, VRL, etc. - buffer/: Memory, session/sliding/tumbling windo...
(QB_NEW_EN)
[grammar] ~72-~72: There might be a mistake here.
Context: ...ata structure wrapping Arrow RecordBatch - Stream: Orchestrates components with b...
(QB_NEW_EN)
[grammar] ~73-~73: There might be a mistake here.
Context: ...es components with backpressure handling - Pipeline: Chain of processors for data...
(QB_NEW_EN)
[grammar] ~74-~74: There might be a mistake here.
Context: ...in of processors for data transformation - Buffer: Optional buffering with window...
(QB_NEW_EN)
[grammar] ~103-~103: There might be a mistake here.
Context: ...c ``` ## Key Concepts ### MessageBatch - Wraps Arrow RecordBatch for columnar pro...
(QB_NEW_EN)
[grammar] ~108-~108: There might be a mistake here.
Context: ...-stream scenarios ### Stream Processing - Async processing with Tokio runtime - Ba...
(QB_NEW_EN)
[grammar] ~109-~109: There might be a mistake here.
Context: ...ng - Async processing with Tokio runtime - Backpressure control (threshold: 1024 me...
(QB_NEW_EN)
[grammar] ~110-~110: There might be a mistake here.
Context: ...ssure control (threshold: 1024 messages) - Ordered delivery with sequence numbers -...
(QB_NEW_EN)
[grammar] ~111-~111: There might be a mistake here.
Context: ...- Ordered delivery with sequence numbers - Graceful shutdown with cancellation toke...
(QB_NEW_EN)
[grammar] ~114-~114: There might be a mistake here.
Context: ...ancellation tokens ### Component Traits All components implement async traits: -...
(QB_NEW_EN)
[grammar] ~115-~115: There might be a mistake here.
Context: ...s All components implement async traits: - Input: read(), connect(), close() - `Ou...
(QB_NEW_EN)
[grammar] ~116-~116: There might be a mistake here.
Context: ...aits: - Input: read(), connect(), close() - Output: write(), connect(), close() - `P...
(QB_NEW_EN)
[grammar] ~117-~117: There might be a mistake here.
Context: ...()-Output: write(), connect(), close()-Processor: process()→Vec-B...
(QB_NEW_EN)
[grammar] ~118-~118: There might be a mistake here.
Context: ... close() - Processor: process() → Vec<MessageBatch> - Buffer: read(), write(), flush() ## Dev...
(QB_NEW_EN)
[grammar] ~123-~123: There might be a mistake here.
Context: ...nt Guidelines ### Adding New Components 1. Implement component trait in appropriate...
(QB_NEW_EN)
[grammar] ~134-~134: There might be a mistake here.
Context: ...:EOFfor graceful shutdown ### Testing - Unit tests intests/` directories - Int...
(QB_NEW_EN)
[grammar] ~135-~135: There might be a mistake here.
Context: ...ing - Unit tests in tests/ directories - Integration tests with real components -...
(QB_NEW_EN)
[grammar] ~136-~136: There might be a mistake here.
Context: ...- Integration tests with real components - Use mockall for mocking dependencies ##...
(QB_NEW_EN)
docs/state-management-guide.md
[grammar] ~13-~13: There might be a mistake here.
Context: ...mory storage for development and testing - S3 State: Persistent, distributed stor...
(QB_NEW_EN)
[grammar] ~14-~14: There might be a mistake here.
Context: ...ributed storage for production workloads - Hybrid: Combines memory for speed with...
(QB_NEW_EN)
[grammar] ~21-~21: There might be a mistake here.
Context: ...c**: Triggered at configurable intervals - Barrier-based: Aligned across all oper...
(QB_NEW_EN)
[grammar] ~22-~22: There might be a mistake here.
Context: ...ross all operators using special markers - Incremental: Only changes since the la...
(QB_NEW_EN)
README-STATE-MANAGEMENT.md
[grammar] ~7-~7: There might be a mistake here.
Context: ...rns. ## Table of Contents 1. Overview 2. Core Concepts 3. [Featu...
(QB_NEW_EN)
[grammar] ~8-~8: There might be a mistake here.
Context: ... Overview 2. Core Concepts 3. Features 4. [Getting Started...
(QB_NEW_EN)
[grammar] ~9-~9: There might be a mistake here.
Context: ...e Concepts](#core-concepts) 3. Features 4. Getting Started 5. [E...
(QB_NEW_EN)
[grammar] ~10-~10: There might be a mistake here.
Context: ...Features](#features) 4. Getting Started 5. Examples 6. [Performance](#p...
(QB_NEW_EN)
[grammar] ~11-~11: There might be a mistake here.
Context: ... Started](#getting-started) 5. Examples 6. Performance 7. [Monitorin...
(QB_NEW_EN)
[grammar] ~12-~12: There might be a mistake here.
Context: ...5. Examples 6. Performance 7. Monitoring 8. [Configurati...
(QB_NEW_EN)
[grammar] ~13-~13: There might be a mistake here.
Context: ...erformance](#performance) 7. Monitoring 8. Configuration 9. [Best ...
(QB_NEW_EN)
[grammar] ~14-~14: There might be a mistake here.
Context: ...nitoring](#monitoring) 8. Configuration 9. Best Practices ## Ove...
(QB_NEW_EN)
[grammar] ~21-~21: There might be a mistake here.
Context: ...Maintain state across message processing - Exactly-Once Semantics: Ensure each me...
(QB_NEW_EN)
[grammar] ~22-~22: There might be a mistake here.
Context: ...e each message is processed exactly once - Fault Tolerance: Automatic recovery fr...
(QB_NEW_EN)
[grammar] ~23-~23: There might be a mistake here.
Context: ...recovery from failures using checkpoints - Multiple Backends: Support for in-memo...
(QB_NEW_EN)
[grammar] ~24-~24: There might be a mistake here.
Context: ...for in-memory and S3-based state storage - Performance Optimizations: Batching, c...
(QB_NEW_EN)
[grammar] ~25-~25: There might be a mistake here.
Context: ...ns**: Batching, compression, and caching - Comprehensive Monitoring: Metrics and ...
(QB_NEW_EN)
[grammar] ~34-~34: There might be a mistake here.
Context: ...mory storage for development and testing - S3: Persistent, distributed storage fo...
(QB_NEW_EN)
[grammar] ~35-~35: There might be a mistake here.
Context: ...ributed storage for production workloads - Hybrid: Combines memory speed with S3 ...
(QB_NEW_EN)
[grammar] ~42-~42: There might be a mistake here.
Context: ...odic snapshots at configurable intervals - Aligned: Using barrier mechanisms for ...
(QB_NEW_EN)
[grammar] ~43-~43: There might be a mistake here.
Context: ...Using barrier mechanisms for consistency - Incremental: Only changes are saved to...
(QB_NEW_EN)
[grammar] ~193-~193: There might be a mistake here.
Context: ...Memory Backend: >100K operations/sec - S3 Backend: 1K-10K operations/sec (dep...
(QB_NEW_EN)
[grammar] ~194-~194: There might be a mistake here.
Context: ...K operations/sec (depending on batching) - Checkpoint Overhead: <100ms for 1MB st...
(QB_NEW_EN)
[grammar] ~195-~195: There might be a mistake here.
Context: ...ckpoint Overhead**: <100ms for 1MB state - Recovery Time: Proportional to state s...
(QB_NEW_EN)
[grammar] ~204-~204: There might be a mistake here.
Context: ...metrics: - Operation counts and latency - State size and growth - Checkpoint durat...
(QB_NEW_EN)
[grammar] ~205-~205: There might be a mistake here.
Context: ...unts and latency - State size and growth - Checkpoint duration and success rate - C...
(QB_NEW_EN)
[grammar] ~206-~206: There might be a mistake here.
Context: ...h - Checkpoint duration and success rate - Cache hit/miss ratios - Error rates ###...
(QB_NEW_EN)
[grammar] ~207-~207: There might be a mistake here.
Context: ...and success rate - Cache hit/miss ratios - Error rates ### Health Checks ```rust ...
(QB_NEW_EN)
[grammar] ~307-~307: There might be a mistake here.
Context: ...emoryState: Basic in-memory state store - EnhancedStateManager: Advanced state management - ExactlyOn...
(QB_NEW_EN)
[grammar] ~308-~308: There might be a mistake here.
Context: ...StateManager: Advanced state management - ExactlyOnceProcessor: Wrapper for exactly-once semantics - ...
(QB_NEW_EN)
[grammar] ~309-~309: There might be a mistake here.
Context: ...sor: Wrapper for exactly-once semantics - TwoPhaseCommitOutput`: Transactional output wrapper ### Conf...
(QB_NEW_EN)
[grammar] ~314-~314: There might be a mistake here.
Context: ...tateConfig: State manager configuration - S3StateBackendConfig: S3 backend configuration - Performanc...
(QB_NEW_EN)
[grammar] ~315-~315: There might be a mistake here.
Context: ...BackendConfig: S3 backend configuration - PerformanceConfig`: Performance optimization settings ###...
(QB_NEW_EN)
[grammar] ~320-~320: There might be a mistake here.
Context: ...ng - StateMonitor: Metrics collection - StateMetrics: Prometheus metrics - HealthStatus: S...
(QB_NEW_EN)
[grammar] ~321-~321: There might be a mistake here.
Context: ...ion - StateMetrics: Prometheus metrics - HealthStatus: System health information ## Troubles...
(QB_NEW_EN)
[grammar] ~328-~328: There might be a mistake here.
Context: ... Common Issues 1. Checkpoint Failures - Check S3 permissions - Verify network...
(QB_NEW_EN)
[grammar] ~333-~333: There might be a mistake here.
Context: ... available disk space 2. High Latency - Enable batching - Increase cache size...
(QB_NEW_EN)
[grammar] ~338-~338: There might be a mistake here.
Context: ...eck S3 performance 3. Memory Pressure - Use S3 backend - Reduce checkpoint fr...
(QB_NEW_EN)
[grammar] ~343-~343: There might be a mistake here.
Context: ...ate partitioning 4. Recovery Failures - Verify checkpoint integrity - Check b...
(QB_NEW_EN)
[grammar] ~362-~362: There might be a mistake here.
Context: ...ing**: Automatic sharding of large state 2. Incremental Checkpoints: Only save cha...
(QB_NEW_EN)
[grammar] ~363-~363: There might be a mistake here.
Context: ...emental Checkpoints**: Only save changes 3. Async State Backends: Non-blocking sta...
(QB_NEW_EN)
[grammar] ~364-~364: There might be a mistake here.
Context: ...ackends**: Non-blocking state operations 4. State Schema Evolution: Handle changin...
(QB_NEW_EN)
[grammar] ~365-~365: There might be a mistake here.
Context: ...olution**: Handle changing state schemas 5. Distributed Checkpointing: Multi-node ...
(QB_NEW_EN)
[grammar] ~370-~370: There might be a mistake here.
Context: ...Native Serialization**: Faster than JSON 2. Compression Algorithms: Choose based o...
(QB_NEW_EN)
[grammar] ~372-~372: There might be a mistake here.
Context: ...ng Strategies**: Adaptive cache policies 4. Batch Sizing: Dynamic batch optimizati...
(QB_NEW_EN)
🔇 Additional comments (13)
README-STATE-MANAGEMENT.md (1)
106-116: Confirmed existence of performance structsBoth
OptimizedS3BackendandPerformanceConfigare defined in the performance module. No updates to the README snippet are needed.• crates/arkflow-core/src/state/performance.rs:31–34 defines
PerformanceConfig
• crates/arkflow-core/src/state/performance.rs:72–75 definesOptimizedS3Backenddocs/state-management.md (1)
55-62: Verifymaybe_inject_barrierreturn typeSome injectors return Option or produce a separate barrier batch. Confirm signature and adjust the example accordingly.
examples/stateful_pipeline.rs (1)
340-344: No type alignment changes required for ExactlyOnceProcessorThe
ExactlyOnceProcessor::newconstructor is defined as:pub fn new( inner: P, state_manager: Arc<tokio::sync::RwLock<EnhancedStateManager>>, operator_id: String, ) -> Self { … }and in
examples/stateful_pipeline.rsyou passlet state_manager = if config.state.enable_monitoring { let monitored = MonitoredStateManager::new(…).await?; Arc::new(RwLock::new(monitored.inner)) // yields EnhancedStateManager } else { Arc::new(RwLock::new(EnhancedStateManager::new(…)?)) }; let exactly_once_processor = ExactlyOnceProcessor::new( processor, state_manager.clone(), "aggregation_pipeline".to_string(), );Here,
state_manager.clone()is anArc<RwLock<EnhancedStateManager>>, matching the constructor’s signature exactly. The code never passes aMonitoredStateManageritself—only its.innerfield—so no change toExactlyOnceProcessor::newis needed.Likely an incorrect or invalid review comment.
crates/arkflow-core/src/state/integration_tests.rs (1)
59-74: LGTM: SimpleMemoryState typed put/get happy-paths coveredGood basic coverage for string and numeric types.
crates/arkflow-core/src/lib.rs (3)
38-38: Publicly exposingstateis a breaking surface change—confirm semver expectationsDownstream crates will now see
arkflow_core::state. Ensure this aligns with your release plan.
98-126: Potential BinaryArray construction issue elsewhere in this implUnrelated to this hunk but in the same impl:
BinaryArray::from_vec(bytes)wherebytes: Vec<&[u8]>may not match Arrow’s expected type. PreferBinaryArray::from(binary_data)orfrom_iter_values.If needed, change:
let binary_data: Vec<&[u8]> = content.iter().map(|v| v.as_slice()).collect(); let array = BinaryArray::from(binary_data);
330-352: No duplicates and barrier semantics validated
- Confirmed only one
MessageBatch::metadataimplementation exists (no duplicates found).- Verified
is_checkpoint()is defined incrates/arkflow-core/src/state/transaction.rsand invoked correctly in enhanced modules—barrier semantics align with the intended BarrierType contract.docs/state-management-guide.md (1)
76-111: Docs Verification Complete:ExactlyOnceProcessorandTwoPhaseCommitOutputAPIs ExistI’ve confirmed that both types and their constructors match the documentation:
•
ExactlyOnceProcessor::new(inner, state_manager, operator_id: String)is defined incrates/arkflow-core/src/state/enhanced.rs(lines 312–318).
•TwoPhaseCommitOutput::new(inner, state_manager)is defined in the same file (lines 387–394).No changes to the guide are needed; the examples accurately reflect the public API.
Likely an incorrect or invalid review comment.
crates/arkflow-core/src/state/helper.rs (1)
20-31: Solid typed store abstraction; JSON serde mapping is consistent.Trait bounds and the SimpleMemoryState implementation look correct. The JSON-to-bytes approach is coherent across get/put. Derives (Clone, Serialize, Deserialize) enable snapshotting and checkpointing cleanly.
Also applies to: 33-37, 39-56, 58-80, 82-113
crates/arkflow-core/src/state/simple.rs (1)
53-70: LGTM! Good defensive programming with optional chainingThe implementation properly handles missing input names and safely extracts/updates counts using
and_thenandunwrap_or. The state update is atomic within the write lock scope.crates/arkflow-core/src/state/transaction.rs (1)
49-59: LGTM! Well-designed transaction context initializationThe implementation correctly generates unique transaction IDs using UUIDs and captures Unix timestamps. The structure supports different barrier types effectively.
crates/arkflow-core/src/state/s3_backend.rs (1)
116-121: Well-designed path structure for state organizationThe hierarchical path structure with checkpoint/state/operator organization is clean and will scale well. The use of
.jsonextension makes the storage format explicit.crates/arkflow-core/src/state/enhanced.rs (1)
291-298: LGTM! Clean shutdown implementationThe shutdown method properly creates a final checkpoint before shutting down, ensuring state is persisted. The conditional logic based on configuration is appropriate.
| async fn process_transactional_batch( | ||
| &mut self, | ||
| batch: MessageBatch, | ||
| tx_ctx: TransactionContext, | ||
| ) -> Result<Vec<MessageBatch>, Error> { | ||
| // Register transaction | ||
| self.register_transaction(&tx_ctx).await?; | ||
|
|
||
| // If this is a checkpoint barrier, trigger checkpoint | ||
| if tx_ctx.is_checkpoint() { | ||
| self.trigger_checkpoint(tx_ctx.checkpoint_id).await?; | ||
| } | ||
|
|
||
| // Process the batch (return as-is for now) | ||
| // In a real implementation, you'd apply transformations here | ||
| Ok(vec![batch]) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Missing error handling for failed transactions
The process_transactional_batch method registers transactions but doesn't handle cleanup if the checkpoint trigger fails.
Add proper error handling and transaction cleanup:
async fn process_transactional_batch(
&mut self,
batch: MessageBatch,
tx_ctx: TransactionContext,
) -> Result<Vec<MessageBatch>, Error> {
// Register transaction
self.register_transaction(&tx_ctx).await?;
// If this is a checkpoint barrier, trigger checkpoint
if tx_ctx.is_checkpoint() {
- self.trigger_checkpoint(tx_ctx.checkpoint_id).await?;
+ if let Err(e) = self.trigger_checkpoint(tx_ctx.checkpoint_id).await {
+ // Clean up the failed transaction
+ self.active_transactions.write().await.remove(&tx_ctx.transaction_id);
+ return Err(e);
+ }
}
// Process the batch (return as-is for now)
// In a real implementation, you'd apply transformations here
Ok(vec![batch])
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn process_transactional_batch( | |
| &mut self, | |
| batch: MessageBatch, | |
| tx_ctx: TransactionContext, | |
| ) -> Result<Vec<MessageBatch>, Error> { | |
| // Register transaction | |
| self.register_transaction(&tx_ctx).await?; | |
| // If this is a checkpoint barrier, trigger checkpoint | |
| if tx_ctx.is_checkpoint() { | |
| self.trigger_checkpoint(tx_ctx.checkpoint_id).await?; | |
| } | |
| // Process the batch (return as-is for now) | |
| // In a real implementation, you'd apply transformations here | |
| Ok(vec![batch]) | |
| } | |
| async fn process_transactional_batch( | |
| &mut self, | |
| batch: MessageBatch, | |
| tx_ctx: TransactionContext, | |
| ) -> Result<Vec<MessageBatch>, Error> { | |
| // Register transaction | |
| self.register_transaction(&tx_ctx).await?; | |
| // If this is a checkpoint barrier, trigger checkpoint | |
| if tx_ctx.is_checkpoint() { | |
| if let Err(e) = self.trigger_checkpoint(tx_ctx.checkpoint_id).await { | |
| // Clean up the failed transaction | |
| self.active_transactions.write().await.remove(&tx_ctx.transaction_id); | |
| return Err(e); | |
| } | |
| } | |
| // Process the batch (return as-is for now) | |
| // In a real implementation, you'd apply transformations here | |
| Ok(vec![batch]) | |
| } |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/enhanced.rs around lines 149 to 165, the code
registers a transaction then calls trigger_checkpoint but does not clean up the
registered transaction if trigger_checkpoint (or subsequent processing) fails;
update the function so that any error after register_transaction results in
unregistering/rolling back the transaction (call the existing
unregister/rollback method with tx_ctx) before returning the error, and ensure
the transaction is also cleaned up on successful completion (use a
try/finally-style pattern or explicit match to call unregister on both error and
success paths) while preserving and returning the original error when failures
occur.
| pub mod enhanced; | ||
| pub mod helper; | ||
| pub mod integration_tests; | ||
| pub mod monitoring; | ||
| pub mod performance; | ||
| pub mod s3_backend; | ||
| pub mod simple; | ||
| pub mod tests; | ||
| pub mod transaction; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Gate tests modules behind cfg(test).
Exporting tests/integration_tests publicly in the library increases compile surface and binary size.
-pub mod integration_tests;
@@
-pub mod tests;
+#[cfg(test)]
+pub mod integration_tests;
+#[cfg(test)]
+pub mod tests;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub mod enhanced; | |
| pub mod helper; | |
| pub mod integration_tests; | |
| pub mod monitoring; | |
| pub mod performance; | |
| pub mod s3_backend; | |
| pub mod simple; | |
| pub mod tests; | |
| pub mod transaction; | |
| pub mod enhanced; | |
| pub mod helper; | |
| #[cfg(test)] | |
| pub mod integration_tests; | |
| pub mod monitoring; | |
| pub mod performance; | |
| pub mod s3_backend; | |
| pub mod simple; | |
| #[cfg(test)] | |
| pub mod tests; | |
| pub mod transaction; |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/mod.rs around lines 23–31, the test modules
(especially tests and integration_tests) are exported publicly which increases
compile surface; add the #[cfg(test)] attribute above the module declarations
for integration_tests and tests (or change them to #[cfg(test)] pub(crate) or
non-pub mod as appropriate) so they are only compiled for test builds, and
remove their public export if it isn’t required for tests to run.
| // Register all metrics | ||
| registry.register(Box::new(operations_total.clone()))?; | ||
| registry.register(Box::new(operations_success_total.clone()))?; | ||
| registry.register(Box::new(operations_failed_total.clone()))?; | ||
| registry.register(Box::new(operation_duration_seconds.clone()))?; | ||
| registry.register(Box::new(state_size_bytes.clone()))?; | ||
| registry.register(Box::new(checkpoint_size_bytes.clone()))?; | ||
| registry.register(Box::new(checkpoints_total.clone()))?; | ||
| registry.register(Box::new(checkpoint_duration_seconds.clone()))?; | ||
| registry.register(Box::new(checkpoint_success_total.clone()))?; | ||
| registry.register(Box::new(checkpoint_failed_total.clone()))?; | ||
| registry.register(Box::new(active_transactions.clone()))?; | ||
| registry.register(Box::new(transaction_duration_seconds.clone()))?; | ||
| registry.register(Box::new(cache_hits_total.clone()))?; | ||
| registry.register(Box::new(cache_misses_total.clone()))?; | ||
| registry.register(Box::new(cache_size_bytes.clone()))?; | ||
| registry.register(Box::new(s3_operations_total.clone()))?; | ||
| registry.register(Box::new(s3_operation_duration_seconds.clone()))?; | ||
| registry.register(Box::new(s3_errors_total.clone()))?; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map Prometheus register() errors into crate::Error (avoids type-mismatch on '?').
Unless From<prometheus::Error> for crate::Error exists, these ?s won’t compile.
Apply this diff to wrap each register() with map_err:
- registry.register(Box::new(operations_total.clone()))?;
- registry.register(Box::new(operations_success_total.clone()))?;
- registry.register(Box::new(operations_failed_total.clone()))?;
- registry.register(Box::new(operation_duration_seconds.clone()))?;
- registry.register(Box::new(state_size_bytes.clone()))?;
- registry.register(Box::new(checkpoint_size_bytes.clone()))?;
- registry.register(Box::new(checkpoints_total.clone()))?;
- registry.register(Box::new(checkpoint_duration_seconds.clone()))?;
- registry.register(Box::new(checkpoint_success_total.clone()))?;
- registry.register(Box::new(checkpoint_failed_total.clone()))?;
- registry.register(Box::new(active_transactions.clone()))?;
- registry.register(Box::new(transaction_duration_seconds.clone()))?;
- registry.register(Box::new(cache_hits_total.clone()))?;
- registry.register(Box::new(cache_misses_total.clone()))?;
- registry.register(Box::new(cache_size_bytes.clone()))?;
- registry.register(Box::new(s3_operations_total.clone()))?;
- registry.register(Box::new(s3_operation_duration_seconds.clone()))?;
- registry.register(Box::new(s3_errors_total.clone()))?;
+ registry.register(Box::new(operations_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_total: {e}")))?;
+ registry.register(Box::new(operations_success_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_success_total: {e}")))?;
+ registry.register(Box::new(operations_failed_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_failed_total: {e}")))?;
+ registry.register(Box::new(operation_duration_seconds.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operation_duration_seconds: {e}")))?;
+ registry.register(Box::new(state_size_bytes.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_size_bytes: {e}")))?;
+ registry.register(Box::new(checkpoint_size_bytes.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_size_bytes: {e}")))?;
+ registry.register(Box::new(checkpoints_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_total: {e}")))?;
+ registry.register(Box::new(checkpoint_duration_seconds.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_duration_seconds: {e}")))?;
+ registry.register(Box::new(checkpoint_success_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_success_total: {e}")))?;
+ registry.register(Box::new(checkpoint_failed_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_failed_total: {e}")))?;
+ registry.register(Box::new(active_transactions.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_active_transactions: {e}")))?;
+ registry.register(Box::new(transaction_duration_seconds.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_transaction_duration_seconds: {e}")))?;
+ registry.register(Box::new(cache_hits_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_hits_total: {e}")))?;
+ registry.register(Box::new(cache_misses_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_misses_total: {e}")))?;
+ registry.register(Box::new(cache_size_bytes.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_size_bytes: {e}")))?;
+ registry.register(Box::new(s3_operations_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operations_total: {e}")))?;
+ registry.register(Box::new(s3_operation_duration_seconds.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operation_duration_seconds: {e}")))?;
+ registry.register(Box::new(s3_errors_total.clone()))
+ .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_errors_total: {e}")))?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Register all metrics | |
| registry.register(Box::new(operations_total.clone()))?; | |
| registry.register(Box::new(operations_success_total.clone()))?; | |
| registry.register(Box::new(operations_failed_total.clone()))?; | |
| registry.register(Box::new(operation_duration_seconds.clone()))?; | |
| registry.register(Box::new(state_size_bytes.clone()))?; | |
| registry.register(Box::new(checkpoint_size_bytes.clone()))?; | |
| registry.register(Box::new(checkpoints_total.clone()))?; | |
| registry.register(Box::new(checkpoint_duration_seconds.clone()))?; | |
| registry.register(Box::new(checkpoint_success_total.clone()))?; | |
| registry.register(Box::new(checkpoint_failed_total.clone()))?; | |
| registry.register(Box::new(active_transactions.clone()))?; | |
| registry.register(Box::new(transaction_duration_seconds.clone()))?; | |
| registry.register(Box::new(cache_hits_total.clone()))?; | |
| registry.register(Box::new(cache_misses_total.clone()))?; | |
| registry.register(Box::new(cache_size_bytes.clone()))?; | |
| registry.register(Box::new(s3_operations_total.clone()))?; | |
| registry.register(Box::new(s3_operation_duration_seconds.clone()))?; | |
| registry.register(Box::new(s3_errors_total.clone()))?; | |
| // Register all metrics | |
| registry.register(Box::new(operations_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_total: {e}")))?; | |
| registry.register(Box::new(operations_success_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_success_total: {e}")))?; | |
| registry.register(Box::new(operations_failed_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_failed_total: {e}")))?; | |
| registry.register(Box::new(operation_duration_seconds.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operation_duration_seconds: {e}")))?; | |
| registry.register(Box::new(state_size_bytes.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_size_bytes: {e}")))?; | |
| registry.register(Box::new(checkpoint_size_bytes.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_size_bytes: {e}")))?; | |
| registry.register(Box::new(checkpoints_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_total: {e}")))?; | |
| registry.register(Box::new(checkpoint_duration_seconds.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_duration_seconds: {e}")))?; | |
| registry.register(Box::new(checkpoint_success_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_success_total: {e}")))?; | |
| registry.register(Box::new(checkpoint_failed_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_failed_total: {e}")))?; | |
| registry.register(Box::new(active_transactions.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_active_transactions: {e}")))?; | |
| registry.register(Box::new(transaction_duration_seconds.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_transaction_duration_seconds: {e}")))?; | |
| registry.register(Box::new(cache_hits_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_hits_total: {e}")))?; | |
| registry.register(Box::new(cache_misses_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_misses_total: {e}")))?; | |
| registry.register(Box::new(cache_size_bytes.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_size_bytes: {e}")))?; | |
| registry.register(Box::new(s3_operations_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operations_total: {e}")))?; | |
| registry.register(Box::new(s3_operation_duration_seconds.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operation_duration_seconds: {e}")))?; | |
| registry.register(Box::new(s3_errors_total.clone())) | |
| .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_errors_total: {e}")))?; |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/monitoring.rs around lines 179-198, each
registry.register(...) call currently uses ? on a prometheus::Error which won't
match crate::Error; wrap each register(...) call with
.map_err(crate::Error::from)? (or .map_err(|e| crate::Error::from(e))?) so the
prometheus::Error is converted to crate::Error before using ?; apply this change
to every registry.register(...) line in the block.
| /// Operation timer for measuring duration | ||
| pub struct OperationTimer { | ||
| start: Instant, | ||
| metrics: Arc<StateMetrics>, | ||
| operation_type: OperationType, | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
OperationTimer lacks Drop semantics; timers dropped without success/failure aren’t recorded.
Tests assume drop==success. Add a guard flag to support Drop-based success.
Apply this diff:
pub struct OperationTimer {
start: Instant,
metrics: Arc<StateMetrics>,
operation_type: OperationType,
+ recorded: bool,
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/monitoring.rs around lines 222 to 228,
OperationTimer currently has no Drop behavior so timers discarded without an
explicit success/failure call aren't recorded; tests expect drop==success. Add a
boolean guard (e.g., completed: AtomicBool or a plain bool if single-threaded)
to the struct, set it to true whenever success() or failure() is called,
implement Drop to check the guard and if not completed record a success and mark
completed, and ensure any methods that record metrics respect and set the guard
to avoid double-reporting.
| /// Stop timing and record success | ||
| pub fn success(self) { | ||
| let duration = self.start.elapsed().as_secs_f64(); | ||
| self.metrics.operation_duration_seconds.observe(duration); | ||
| self.metrics.operations_success_total.inc(); | ||
|
|
||
| // Record specific metric | ||
| match self.operation_type { | ||
| OperationType::Checkpoint => { | ||
| self.metrics.checkpoint_duration_seconds.observe(duration); | ||
| self.metrics.checkpoint_success_total.inc(); | ||
| } | ||
| OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => { | ||
| self.metrics.s3_operation_duration_seconds.observe(duration); | ||
| } | ||
| OperationType::Transaction => { | ||
| self.metrics.transaction_duration_seconds.observe(duration); | ||
| } | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
| /// Stop timing and record failure | ||
| pub fn failure(self) { | ||
| let duration = self.start.elapsed().as_secs_f64(); | ||
| self.metrics.operation_duration_seconds.observe(duration); | ||
| self.metrics.operations_failed_total.inc(); | ||
|
|
||
| // Record specific metric | ||
| match self.operation_type { | ||
| OperationType::Checkpoint => { | ||
| self.metrics.checkpoint_duration_seconds.observe(duration); | ||
| self.metrics.checkpoint_failed_total.inc(); | ||
| } | ||
| OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => { | ||
| self.metrics.s3_operation_duration_seconds.observe(duration); | ||
| self.metrics.s3_errors_total.inc(); | ||
| } | ||
| _ => {} | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Mark timer as recorded in success/failure and implement Drop default-success.
Prevents leaks/double-counting; aligns with test assumption.
Apply this diff:
- pub fn success(self) {
- let duration = self.start.elapsed().as_secs_f64();
- self.metrics.operation_duration_seconds.observe(duration);
- self.metrics.operations_success_total.inc();
+ pub fn success(mut self) {
+ let duration = self.start.elapsed().as_secs_f64();
+ self.metrics.operation_duration_seconds.observe(duration);
+ self.metrics.operations_success_total.inc();
+ self.recorded = true;
// Record specific metric
match self.operation_type {
OperationType::Checkpoint => {
self.metrics.checkpoint_duration_seconds.observe(duration);
self.metrics.checkpoint_success_total.inc();
}
OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
self.metrics.s3_operation_duration_seconds.observe(duration);
}
OperationType::Transaction => {
self.metrics.transaction_duration_seconds.observe(duration);
}
_ => {}
}
}
/// Stop timing and record failure
- pub fn failure(self) {
- let duration = self.start.elapsed().as_secs_f64();
- self.metrics.operation_duration_seconds.observe(duration);
- self.metrics.operations_failed_total.inc();
+ pub fn failure(mut self) {
+ let duration = self.start.elapsed().as_secs_f64();
+ self.metrics.operation_duration_seconds.observe(duration);
+ self.metrics.operations_failed_total.inc();
+ self.recorded = true;
// Record specific metric
match self.operation_type {
OperationType::Checkpoint => {
self.metrics.checkpoint_duration_seconds.observe(duration);
self.metrics.checkpoint_failed_total.inc();
}
OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
self.metrics.s3_operation_duration_seconds.observe(duration);
self.metrics.s3_errors_total.inc();
}
_ => {}
}
}Add this new impl (outside the range):
impl Drop for OperationTimer {
fn drop(&mut self) {
if self.recorded {
return;
}
// Default to success on drop
let duration = self.start.elapsed().as_secs_f64();
self.metrics.operation_duration_seconds.observe(duration);
self.metrics.operations_success_total.inc();
match self.operation_type {
OperationType::Checkpoint => {
self.metrics.checkpoint_duration_seconds.observe(duration);
self.metrics.checkpoint_success_total.inc();
}
OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => {
self.metrics.s3_operation_duration_seconds.observe(duration);
}
OperationType::Transaction => {
self.metrics.transaction_duration_seconds.observe(duration);
}
_ => {}
}
self.recorded = true;
}
}🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/monitoring.rs around lines 254 to 294, the
OperationTimer methods currently observe metrics but never mark the timer as
recorded, which can cause double-counting or leaks and tests expect a
default-success on Drop; update both success(self) and failure(self) to set
self.recorded = true after recording metrics, and add a Drop impl for
OperationTimer (outside the shown range) that checks if !self.recorded then
records a default success identical to success() (observe duration, increment
operations_success_total, record the specific per-operation metric) and finally
sets self.recorded = true.
| /// Get total word count from state | ||
| pub async fn get_total_words(&self) -> Result<u64, Error> { | ||
| let state_manager = self.state_manager.read().await; | ||
| state_manager | ||
| .get_state_value(&self.operator_id, &"total_words") | ||
| .await | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: return type mismatch (Result<u64, _> vs Result<Option, _>).
Unwrap with default and specify type.
- pub async fn get_total_words(&self) -> Result<u64, Error> {
- let state_manager = self.state_manager.read().await;
- state_manager
- .get_state_value(&self.operator_id, &"total_words")
- .await
- }
+ pub async fn get_total_words(&self) -> Result<u64, Error> {
+ let state_manager = self.state_manager.read().await;
+ Ok(
+ state_manager
+ .get_state_value::<u64>(&self.operator_id, "total_words")
+ .await?
+ .unwrap_or(0),
+ )
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Get total word count from state | |
| pub async fn get_total_words(&self) -> Result<u64, Error> { | |
| let state_manager = self.state_manager.read().await; | |
| state_manager | |
| .get_state_value(&self.operator_id, &"total_words") | |
| .await | |
| } | |
| /// Get total word count from state | |
| pub async fn get_total_words(&self) -> Result<u64, Error> { | |
| let state_manager = self.state_manager.read().await; | |
| Ok( | |
| state_manager | |
| .get_state_value::<u64>(&self.operator_id, "total_words") | |
| .await? | |
| .unwrap_or(0), | |
| ) | |
| } |
🤖 Prompt for AI Agents
In examples/word_count.rs around lines 68-74, the function returns Result<u64,
Error> but state_manager.get_state_value(...) returns Result<Option<u64>,
Error>; change the call to unwrap the Option with a default (0) while preserving
the Result error. For example, await the call and then map the successful Option
to a u64 with .map(|opt| opt.unwrap_or(0)), so the function returns Result<u64,
Error> (i.e., replace the final .await with .await.map(|opt| opt.unwrap_or(0))).
| /// Get count for specific word | ||
| pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> { | ||
| let state_manager = self.state_manager.read().await; | ||
| state_manager | ||
| .get_state_value(&self.operator_id, &format!("word_{}", word)) | ||
| .await | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same bug for get_word_count.
- pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> {
- let state_manager = self.state_manager.read().await;
- state_manager
- .get_state_value(&self.operator_id, &format!("word_{}", word))
- .await
- }
+ pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> {
+ let state_manager = self.state_manager.read().await;
+ Ok(
+ state_manager
+ .get_state_value::<u64>(&self.operator_id, &format!("word_{}", word))
+ .await?
+ .unwrap_or(0),
+ )
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Get count for specific word | |
| pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> { | |
| let state_manager = self.state_manager.read().await; | |
| state_manager | |
| .get_state_value(&self.operator_id, &format!("word_{}", word)) | |
| .await | |
| } | |
| /// Get count for specific word | |
| pub async fn get_word_count(&self, word: &str) -> Result<u64, Error> { | |
| let state_manager = self.state_manager.read().await; | |
| Ok( | |
| state_manager | |
| .get_state_value::<u64>(&self.operator_id, &format!("word_{}", word)) | |
| .await? | |
| .unwrap_or(0), | |
| ) | |
| } |
🤖 Prompt for AI Agents
In examples/word_count.rs around lines 76-83, get_word_count currently returns
the raw result of state_manager.get_state_value which likely yields a
Result<Option<u64>, Error> (or Option<u64>), causing a type mismatch or
incorrect handling when the key is missing; change the call to await and then
map the returned Option to a u64 (e.g., Ok(value.unwrap_or(0))) while properly
propagating errors (use match or .map_ok(|opt| opt.unwrap_or(0)) / .await? and
return Result<u64, Error>) so missing words return 0 and errors are forwarded.
| // Print statistics | ||
| let processor_inner = exactly_once_processor; | ||
| let total_words = processor_inner.get_state(&"total_words").await?; | ||
| let hello_count = processor_inner.get_state(&"word_hello").await?; | ||
|
|
||
| println!("\nFinal Statistics:"); | ||
| println!("Total words processed: {:?}", total_words); | ||
| println!("'hello' count: {:?}", hello_count); | ||
|
|
||
| // Create checkpoint | ||
| let mut state_manager_write = state_manager.write().await; | ||
| let checkpoint_id = state_manager_write.create_checkpoint().await?; | ||
| println!("Created checkpoint: {}", checkpoint_id); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invalid API usage: ExactlyOnceProcessor likely doesn’t expose get_state; read from the shared state_manager instead and use the processor’s operator_id.
Fix by querying state directly.
- let processor_inner = exactly_once_processor;
- let total_words = processor_inner.get_state(&"total_words").await?;
- let hello_count = processor_inner.get_state(&"word_hello").await?;
-
- println!("\nFinal Statistics:");
- println!("Total words processed: {:?}", total_words);
- println!("'hello' count: {:?}", hello_count);
+ // Query state directly
+ {
+ let sm = state_manager.read().await;
+ let total_words: u64 = sm
+ .get_state_value("word_count_operator", "total_words")
+ .await?
+ .unwrap_or(0);
+ let hello_count: u64 = sm
+ .get_state_value("word_count_operator", "word_hello")
+ .await?
+ .unwrap_or(0);
+ println!("\nFinal Statistics:");
+ println!("Total words processed: {}", total_words);
+ println!("'hello' count: {}", hello_count);
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Print statistics | |
| let processor_inner = exactly_once_processor; | |
| let total_words = processor_inner.get_state(&"total_words").await?; | |
| let hello_count = processor_inner.get_state(&"word_hello").await?; | |
| println!("\nFinal Statistics:"); | |
| println!("Total words processed: {:?}", total_words); | |
| println!("'hello' count: {:?}", hello_count); | |
| // Create checkpoint | |
| let mut state_manager_write = state_manager.write().await; | |
| let checkpoint_id = state_manager_write.create_checkpoint().await?; | |
| println!("Created checkpoint: {}", checkpoint_id); | |
| // Print statistics | |
| // Query state directly | |
| { | |
| let sm = state_manager.read().await; | |
| let total_words: u64 = sm | |
| .get_state_value("word_count_operator", "total_words") | |
| .await? | |
| .unwrap_or(0); | |
| let hello_count: u64 = sm | |
| .get_state_value("word_count_operator", "word_hello") | |
| .await? | |
| .unwrap_or(0); | |
| println!("\nFinal Statistics:"); | |
| println!("Total words processed: {}", total_words); | |
| println!("'hello' count: {}", hello_count); | |
| } | |
| // Create checkpoint | |
| let mut state_manager_write = state_manager.write().await; | |
| let checkpoint_id = state_manager_write.create_checkpoint().await?; | |
| println!("Created checkpoint: {}", checkpoint_id); |
🤖 Prompt for AI Agents
In examples/word_count.rs around lines 263 to 276, the code incorrectly calls
get_state on exactly_once_processor (which doesn't expose get_state); instead
acquire a read lock on the shared state_manager (state_manager.read().await),
use the processor's operator_id to query the two keys (e.g. "total_words" and
"word_hello") from that shared state, then print those results; keep the
checkpoint creation that follows but obtain a write lock only when creating the
checkpoint (state_manager.write().await) as before.
| use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager}; | ||
|
|
||
| let monitor = Arc::new(StateMonitor::new()?); | ||
| let manager = MonitoredStateManager::new(config, monitor).await?; | ||
|
|
||
| // Export Prometheus metrics | ||
| let metrics = manager.export_metrics()?; | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StateMonitor::new likely returns (monitor, registry); sample won’t compile as written
Per core snippet, StateMonitor::new() returns (StateMonitor, Registry). Adjust destructuring and remove/replace export_metrics() (not shown in snippets).
Apply:
-use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager};
+use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager};
+use std::sync::Arc;
-let monitor = Arc::new(StateMonitor::new()?);
-let manager = MonitoredStateManager::new(config, monitor).await?;
-
-// Export Prometheus metrics
-let metrics = manager.export_metrics()?;
+let (monitor, _registry) = StateMonitor::new()?;
+let monitor = Arc::new(monitor);
+let manager = MonitoredStateManager::new(config, monitor).await?;
+// Use the registry for exposition; see Prometheus example below.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager}; | |
| let monitor = Arc::new(StateMonitor::new()?); | |
| let manager = MonitoredStateManager::new(config, monitor).await?; | |
| // Export Prometheus metrics | |
| let metrics = manager.export_metrics()?; | |
| ``` | |
| use arkflow_core::state::monitoring::{StateMonitor, MonitoredStateManager}; | |
| use std::sync::Arc; | |
| let (monitor, _registry) = StateMonitor::new()?; | |
| let monitor = Arc::new(monitor); | |
| let manager = MonitoredStateManager::new(config, monitor).await?; | |
| // Use the registry for exposition; see Prometheus example below. |
🤖 Prompt for AI Agents
In README-STATE-MANAGEMENT.md around lines 121 to 128, StateMonitor::new()
actually returns a tuple (StateMonitor, Registry) so the example must
destructure both values and use the returned registry; change the code to
capture both monitor and registry from StateMonitor::new(), pass the registry to
whatever telemetry setup needs it, and remove or replace the call to
manager.export_metrics() (which doesn’t exist) with the correct metrics
registration using the Registry returned by StateMonitor::new().
| // Start HTTP server for metrics | ||
| let registry = manager.monitor().registry(); | ||
| HttpServer::new(move || { | ||
| App::new().app_data(registry.clone()).route( | ||
| "/metrics", | ||
| web::get().to(|registry: web::Data<Registry>| { | ||
| let encoder = TextEncoder::new(); | ||
| let metric_families = registry.gather(); | ||
| Ok(web::Bytes::from(encoder.encode_to_string(&metric_families).unwrap())) | ||
| }) | ||
| ) | ||
| }) | ||
| .bind("0.0.0.0:9090")? | ||
| .run() | ||
| .await; | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Prometheus example uses a non-existent encode_to_string; fix encoding flow
The prometheus TextEncoder encodes into a buffer. Replace with the canonical pattern.
- web::get().to(|registry: web::Data<Registry>| {
- let encoder = TextEncoder::new();
- let metric_families = registry.gather();
- Ok(web::Bytes::from(encoder.encode_to_string(&metric_families).unwrap()))
- })
+ web::get().to(|registry: web::Data<Registry>| {
+ use prometheus::{Encoder, TextEncoder};
+ let encoder = TextEncoder::new();
+ let metric_families = registry.gather();
+ let mut buf = Vec::new();
+ encoder.encode(&metric_families, &mut buf).map_err(|e| actix_web::error::ErrorInternalServerError(e))?;
+ Ok::<_, actix_web::Error>(web::Bytes::from(buf))
+ })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Start HTTP server for metrics | |
| let registry = manager.monitor().registry(); | |
| HttpServer::new(move || { | |
| App::new().app_data(registry.clone()).route( | |
| "/metrics", | |
| web::get().to(|registry: web::Data<Registry>| { | |
| let encoder = TextEncoder::new(); | |
| let metric_families = registry.gather(); | |
| Ok(web::Bytes::from(encoder.encode_to_string(&metric_families).unwrap())) | |
| }) | |
| ) | |
| }) | |
| .bind("0.0.0.0:9090")? | |
| .run() | |
| .await; | |
| ``` | |
| // Start HTTP server for metrics | |
| let registry = manager.monitor().registry(); | |
| HttpServer::new(move || { | |
| App::new().app_data(registry.clone()).route( | |
| "/metrics", | |
| web::get().to(|registry: web::Data<Registry>| { | |
| use prometheus::{Encoder, TextEncoder}; | |
| let encoder = TextEncoder::new(); | |
| let metric_families = registry.gather(); | |
| let mut buf = Vec::new(); | |
| encoder | |
| .encode(&metric_families, &mut buf) | |
| .map_err(|e| actix_web::error::ErrorInternalServerError(e))?; | |
| Ok::<_, actix_web::Error>(web::Bytes::from(buf)) | |
| }) | |
| ) | |
| }) | |
| .bind("0.0.0.0:9090")? | |
| .run() | |
| .await; |
🤖 Prompt for AI Agents
In README-STATE-MANAGEMENT.md around lines 222–237 the Prometheus TextEncoder
call uses a non-existent encode_to_string; replace it with the canonical flow:
create a mutable Vec<u8> buffer, call encoder.encode(&metric_families, &mut
buffer)? to write into that buffer, then return web::Bytes::from(buffer) (or
convert to String if you need text). Ensure you propagate or handle the encoder
error (use ? or map_err into an appropriate actix-web error) instead of
unwrap().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Nitpick comments (8)
crates/arkflow-core/src/config.rs (1)
211-227: Consider adding validation for S3 configurationThe S3 configuration accepts optional credentials. When credentials are not provided, the code will use the default AWS credential chain. Consider documenting this behavior more explicitly or adding validation to ensure either both
access_key_idandsecret_access_keyare provided or neither.Add a validation method:
impl S3StateBackendConfig { pub fn validate(&self) -> Result<(), Error> { match (&self.access_key_id, &self.secret_access_key) { (Some(_), None) | (None, Some(_)) => { Err(Error::Config("Both access_key_id and secret_access_key must be provided together".to_string())) } _ => Ok(()) } } }examples/run_stateful_example.rs (1)
104-104: Handle join errors properlyThe
handle.await?will panic if a task panics. Consider handling this more gracefully.- handle.await?; + match handle.await { + Ok(_) => {}, + Err(e) if e.is_panic() => { + error!("Stream task panicked: {:?}", e); + } + Err(e) => { + error!("Stream task failed: {:?}", e); + } + }crates/arkflow-core/src/stream/mod.rs (1)
493-587: Consider extracting state manager creation logicThe
build_with_statemethod contains complex logic for converting between configuration types and creating the state manager. Consider extracting this into a separate helper function for better maintainability and testability.async fn create_state_manager( state_config: &StateManagementConfig, stream_state: &crate::config::StreamStateConfig, ) -> Result<Option<Arc<RwLock<EnhancedStateManager>>>, Error> { if !state_config.enabled || !stream_state.enabled { return Ok(None); } // Convert state config to enhanced state config let enhanced_config = crate::state::enhanced::EnhancedStateConfig { // ... configuration conversion logic ... }; let state_manager = EnhancedStateManager::new(enhanced_config).await?; Ok(Some(Arc::new(RwLock::new(state_manager)))) }crates/arkflow-core/src/engine_builder.rs (5)
20-21: Prepare for dedup with HashMap::entryImport Entry to cleanly deduplicate state managers by operator_id.
-use std::collections::HashMap; +use std::collections::{HashMap, hash_map::Entry};
51-99: Avoid duplicate state manager creation for the same operator_idCurrently, repeated operator_id values cause multiple managers to be created and all but the last dropped. Deduplicate with entry() and log once.
- // Create a shared state manager for all streams that need it - for stream_config in &self.config.streams { - if let Some(stream_state) = &stream_config.state { - if stream_state.enabled { - let enhanced_config = crate::state::enhanced::EnhancedStateConfig { - enabled: true, - backend_type: match self.config.state_management.backend_type { - crate::config::StateBackendType::Memory => { - crate::state::enhanced::StateBackendType::Memory - } - crate::config::StateBackendType::S3 => { - crate::state::enhanced::StateBackendType::S3 - } - crate::config::StateBackendType::Hybrid => { - crate::state::enhanced::StateBackendType::Hybrid - } - }, - s3_config: self.config.state_management.s3_config.as_ref().map( - |config| crate::state::s3_backend::S3StateBackendConfig { - bucket: config.bucket.clone(), - region: config.region.clone(), - endpoint: config.endpoint_url.clone(), - access_key_id: config.access_key_id.clone(), - secret_access_key: config.secret_access_key.clone(), - prefix: Some(config.prefix.clone()), - use_ssl: true, - }, - ), - checkpoint_interval_ms: self - .config - .state_management - .checkpoint_interval_ms, - retained_checkpoints: self.config.state_management.retained_checkpoints, - exactly_once: self.config.state_management.exactly_once, - state_timeout_ms: stream_state - .state_timeout_ms - .unwrap_or(self.config.state_management.state_timeout_ms), - }; - - let state_manager = - crate::state::enhanced::EnhancedStateManager::new(enhanced_config) - .await?; - self.state_managers.insert( - stream_state.operator_id.clone(), - Arc::new(RwLock::new(state_manager)), - ); - } - } - } + // Create a shared state manager for all streams that need it (deduplicated by operator_id) + for stream_config in &self.config.streams { + if let Some(stream_state) = &stream_config.state { + if stream_state.enabled { + match self + .state_managers + .entry(stream_state.operator_id.clone()) + { + Entry::Occupied(_) => { + warn!( + "Duplicate operator_id detected: {}. Reusing existing state manager.", + stream_state.operator_id + ); + } + Entry::Vacant(v) => { + let enhanced_config = crate::state::enhanced::EnhancedStateConfig { + enabled: true, + backend_type: match self.config.state_management.backend_type { + crate::config::StateBackendType::Memory => { + crate::state::enhanced::StateBackendType::Memory + } + crate::config::StateBackendType::S3 => { + crate::state::enhanced::StateBackendType::S3 + } + crate::config::StateBackendType::Hybrid => { + crate::state::enhanced::StateBackendType::Hybrid + } + }, + s3_config: self.config.state_management.s3_config.as_ref().map( + |config| crate::state::s3_backend::S3StateBackendConfig { + bucket: config.bucket.clone(), + region: config.region.clone(), + endpoint: config.endpoint_url.clone(), + access_key_id: config.access_key_id.clone(), + secret_access_key: config.secret_access_key.clone(), + prefix: Some(config.prefix.clone()), + use_ssl: true, + }, + ), + checkpoint_interval_ms: self + .config + .state_management + .checkpoint_interval_ms, + retained_checkpoints: self.config.state_management.retained_checkpoints, + exactly_once: self.config.state_management.exactly_once, + state_timeout_ms: stream_state + .state_timeout_ms + .unwrap_or(self.config.state_management.state_timeout_ms), + }; + + let state_manager = crate::state::enhanced::EnhancedStateManager::new( + enhanced_config, + ) + .await?; + v.insert(Arc::new(RwLock::new(state_manager))); + } + } + } + } + }
104-129: Reduce noisy warnings when global state is disabledIf global state is off, per-stream enabled=true will currently warn “No state manager found…”. Gate the warning to avoid noise and add an info explaining the fallback.
- warn!( - "No state manager found for operator: {}", - stream_state.operator_id - ); + if self.config.state_management.enabled { + warn!( + "No state manager found for operator: {}", + stream_state.operator_id + ); + } else { + info!( + "Global state management disabled; building non-stateful stream: {}", + stream_state.operator_id + ); + }
190-197: Drain managers on shutdown to free memory and prevent double-shutdownClears the map and ensures dropped Arcs after shutdown. Keeps shutdown idempotent across builder reuse.
- for (_, state_manager) in self.state_managers.iter() { + for (_, state_manager) in self.state_managers.drain() { let mut manager = state_manager.write().await; manager.shutdown().await?; }
68-78: Expose and propagate SSL configuration for the S3 state backendThe field‐by‐field mapping in
engine_builder.rs(and similarly instream/mod.rs) correctly maps:
config.bucket: String→bucket: Stringconfig.region: String→region: Stringconfig.endpoint_url: Option<String>→endpoint: Option<String>config.access_key_id: Option<String>→access_key_id: Option<String>config.secret_access_key: Option<String>→secret_access_key: Option<String>config.prefix: String→prefix: Some(String)However,
use_sslis currently hardcoded totrue, and the publicS3StateBackendConfigincrates/arkflow-core/src/config.rsdoes not expose a flag to disable TLS for non-AWS (e.g. MinIO) endpoints.Optional refactoring recommendations:
- In
crates/arkflow-core/src/config.rs(the config‐file struct around line 213):
- Add a field:
#[serde(default = "default_use_ssl")] pub use_ssl: bool,- Define the default helper:
fn default_use_ssl() -> bool { true }- In
crates/arkflow-core/src/engine_builder.rs(currently hardcodinguse_sslaround lines 75–76):
- Change
- use_ssl: true, + use_ssl: config.use_ssl,- In
crates/arkflow-core/src/stream/mod.rs(around lines 551–553), make the same update.- Update any examples or tests (e.g. in
examples/word_count.rs,examples/stateful_pipeline.rs) to set the newuse_sslfield as appropriate.This will allow users to opt out of SSL when targeting non-AWS S3–compatible storage.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (8)
crates/arkflow-core/src/config.rs(2 hunks)crates/arkflow-core/src/engine_builder.rs(1 hunks)crates/arkflow-core/src/lib.rs(4 hunks)crates/arkflow-core/src/stream/mod.rs(6 hunks)docs/STATE_MANAGEMENT.md(1 hunks)examples/run_stateful_example.rs(1 hunks)examples/stateful_example.yaml(1 hunks)examples/stateful_s3_example.yaml(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- docs/STATE_MANAGEMENT.md
🚧 Files skipped from review as they are similar to previous changes (1)
- crates/arkflow-core/src/lib.rs
🧰 Additional context used
🧬 Code graph analysis (3)
examples/run_stateful_example.rs (2)
crates/arkflow-core/src/config.rs (1)
from_file(92-109)crates/arkflow-core/src/engine_builder.rs (1)
new(33-38)
crates/arkflow-core/src/engine_builder.rs (2)
crates/arkflow-core/src/stream/mod.rs (2)
new(57-79)output(354-394)crates/arkflow-core/src/config.rs (3)
default(151-159)default(168-174)default(278-288)
crates/arkflow-core/src/stream/mod.rs (1)
crates/arkflow-core/src/config.rs (3)
default(151-159)default(168-174)default(278-288)
🪛 YAMLlint (1.37.1)
examples/stateful_example.yaml
[error] 16-16: trailing spaces
(trailing-spaces)
[error] 40-40: trailing spaces
(trailing-spaces)
[error] 71-71: trailing spaces
(trailing-spaces)
[error] 76-76: trailing spaces
(trailing-spaces)
[error] 115-115: no new line character at the end of file
(new-line-at-end-of-file)
examples/stateful_s3_example.yaml
[error] 21-21: trailing spaces
(trailing-spaces)
[error] 51-51: trailing spaces
(trailing-spaces)
[error] 70-70: trailing spaces
(trailing-spaces)
[error] 74-74: trailing spaces
(trailing-spaces)
[error] 80-80: trailing spaces
(trailing-spaces)
[error] 152-152: trailing spaces
(trailing-spaces)
[error] 177-177: no new line character at the end of file
(new-line-at-end-of-file)
🔇 Additional comments (2)
crates/arkflow-core/src/config.rs (1)
177-200: Well-structured state management configuration!The
StateManagementConfigstruct is well-designed with sensible defaults and proper serde attributes. The configuration options cover all essential aspects of state management including checkpointing, exactly-once semantics, and backend selection.crates/arkflow-core/src/engine_builder.rs (1)
133-181: Stream assembly looks correct and matches Stream::new signatureInput/pipeline/output/error_output/buffer/resource/thread_num/state_manager wiring aligns with Stream::new. Good separation via build_stream_with_state.
| // Note: Exactly-once processor would need to be integrated differently | ||
| // since it consumes the pipeline. For now, we just create the state manager | ||
| Some(state_manager_arc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Document the exactly-once processor integration limitation
The comment mentions that exactly-once processor integration would need to be handled differently. This limitation should be documented more prominently, perhaps in the module documentation or as a TODO with an issue reference.
Would you like me to help create a design document or open an issue to track the proper integration of exactly-once processing with the state manager?
🤖 Prompt for AI Agents
In crates/arkflow-core/src/stream/mod.rs around lines 568-570, the inline
comment about exactly-once processor integration being different needs a more
prominent documentation and tracking entry; update the module-level Rust doc
(//! ...) to mention this limitation and constraints, add a TODO comment near
this code that references a new or existing issue number (e.g. TODO: track
exactly-once processor integration - see issue #NNN), and if desired open a
repository issue describing required design changes (consuming pipeline, state
manager ownership, and migration path) and paste that issue number into the TODO
so future readers can find the detailed plan.
| use arkflow::Error; | ||
| use tokio_util::sync::CancellationToken; | ||
| use tracing::{error, info}; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing import for Duration
The code uses Duration::from_secs on line 66 but doesn't import Duration.
use arkflow::Error;
use tokio_util::sync::CancellationToken;
+use std::time::Duration;
use tracing::{error, info};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| use arkflow::Error; | |
| use tokio_util::sync::CancellationToken; | |
| use std::time::Duration; | |
| use tracing::{error, info}; |
🤖 Prompt for AI Agents
In examples/run_stateful_example.rs around lines 22 to 22 (usage at line 66),
the code calls Duration::from_secs but never imports Duration; add the missing
import by adding use std::time::Duration; alongside the other use statements at
the top of the file so Duration::from_secs resolves correctly.
| retained_checkpoints: 3 | ||
| exactly_once: true | ||
| state_timeout_ms: 3600000 # 1 hour | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove trailing whitespace
There's trailing whitespace on this line that should be removed.
- state_timeout_ms: 3600000 # 1 hour
-
+ state_timeout_ms: 3600000 # 1 hour
+📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| state_timeout_ms: 3600000 # 1 hour | |
🧰 Tools
🪛 YAMLlint (1.37.1)
[error] 16-16: trailing spaces
(trailing-spaces)
🤖 Prompt for AI Agents
In examples/stateful_example.yaml around line 16, remove the trailing whitespace
characters at the end of that line; edit the file to delete the extra space(s)
so the line ends cleanly, then save the file and optionally run your editor's
"trim trailing whitespace" or a linter/pre-commit hook to ensure no other
trailing whitespace remains.
| script: | | ||
| import json | ||
| import re | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove trailing whitespace
There's trailing whitespace on this line that should be removed.
-
+📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
🧰 Tools
🪛 YAMLlint (1.37.1)
[error] 40-40: trailing spaces
(trailing-spaces)
🤖 Prompt for AI Agents
In examples/stateful_example.yaml around line 40, remove the trailing whitespace
characters at the end of the line (ensure there are no spaces or tabs after the
last visible character) so the line ends cleanly; save the file with no trailing
whitespace on that line and run a quick whitespace/lint check to confirm
removal.
| SELECT | ||
| user_id, | ||
| event_type, | ||
| COUNT(*) as event_count, | ||
| SUM(value) as total_value | ||
| FROM flow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove trailing whitespace from SQL query
Multiple lines in the SQL query have trailing whitespace that should be cleaned up.
- SELECT
+ SELECT
user_id,
event_type,
COUNT(*) as event_count,
SUM(value) as total_value
- FROM flow
+ FROM flow📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| SELECT | |
| user_id, | |
| event_type, | |
| COUNT(*) as event_count, | |
| SUM(value) as total_value | |
| FROM flow | |
| SELECT | |
| user_id, | |
| event_type, | |
| COUNT(*) as event_count, | |
| SUM(value) as total_value | |
| FROM flow |
🧰 Tools
🪛 YAMLlint (1.37.1)
[error] 71-71: trailing spaces
(trailing-spaces)
[error] 76-76: trailing spaces
(trailing-spaces)
🤖 Prompt for AI Agents
In examples/stateful_example.yaml around lines 71 to 76, several lines of the
embedded SQL (SELECT user_id, event_type, COUNT(*) as event_count, SUM(value) as
total_value, FROM flow) contain trailing whitespace; remove any trailing spaces
at the end of each SQL line (and any extra blank-space-only characters), ensure
lines end cleanly with no trailing whitespace, and save the file so the SQL
block has no trailing spaces.
| path: "./errors/temperature_errors.txt" | ||
| state: | ||
| operator_id: "temperature-processor" | ||
| enabled: true No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add newline at end of file
YAML files should end with a newline character.
state:
operator_id: "temperature-processor"
enabled: true
+📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| enabled: true | |
| state: | |
| operator_id: "temperature-processor" | |
| enabled: true | |
🧰 Tools
🪛 YAMLlint (1.37.1)
[error] 115-115: no new line character at the end of file
(new-line-at-end-of-file)
🤖 Prompt for AI Agents
In examples/stateful_example.yaml around line 115, the file is missing a
trailing newline; edit the file to add a single newline character at the end of
the file and save so the file ends with a newline (ensure LF newline if your
editor has options).
| retained_checkpoints: 10 # Keep last 10 checkpoints | ||
| exactly_once: true # Enable exactly-once processing | ||
| state_timeout_ms: 2592000000 # 30 days | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove trailing whitespace from multiple lines
Several lines contain trailing whitespace that should be removed for consistency.
Apply these changes:
- Line 21: Remove trailing space after comment
- Line 51: Remove trailing space after
import - Line 70: Remove trailing space after script block
- Line 74: Remove trailing space in SQL query
- Line 80: Remove trailing space after SQL query
- Line 152: Remove trailing space after VRL script comment
Also applies to: 51-51, 70-70, 74-74, 80-80, 152-152
🧰 Tools
🪛 YAMLlint (1.37.1)
[error] 21-21: trailing spaces
(trailing-spaces)
🤖 Prompt for AI Agents
In examples/stateful_s3_example.yaml around lines 21, 51, 70, 74, 80 and 152
there are trailing whitespace characters on those lines; remove the trailing
space at the end of each specified line (line 21: comment, line 51: after
import, line 70: after script block, line 74: inside SQL query end, line 80:
after SQL query, line 152: after VRL script comment) so each line ends cleanly
with no extra spaces.
| state: | ||
| operator_id: "iot-monitor" | ||
| enabled: true | ||
| state_timeout_ms: 604800000 # 7 days No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add newline at end of file
YAML files should end with a newline character.
operator_id: "iot-monitor"
enabled: true
state_timeout_ms: 604800000 # 7 days
+📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| state_timeout_ms: 604800000 # 7 days | |
| operator_id: "iot-monitor" | |
| enabled: true | |
| state_timeout_ms: 604800000 # 7 days | |
🧰 Tools
🪛 YAMLlint (1.37.1)
[error] 177-177: no new line character at the end of file
(new-line-at-end-of-file)
🤖 Prompt for AI Agents
In examples/stateful_s3_example.yaml around line 177, the file is missing a
trailing newline; add a single newline character at the end of the file so the
last line ("state_timeout_ms: 604800000 # 7 days") is terminated by a newline
to comply with POSIX/YAML conventions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/arkflow-core/src/stream/mod.rs (1)
495-505: Respect stateful wrapper during stream close.Currently only
pipeline.close()is called; callstateful_pipeline.close()when present.- info!("pipeline close..."); - if let Err(e) = self.pipeline.close().await { + info!("pipeline close..."); + let close_res = if let Some(sp) = &self.stateful_pipeline { + sp.close().await + } else { + self.pipeline.close().await + }; + if let Err(e) = close_res { error!("Failed to close pipeline: {}", e); }
♻️ Duplicate comments (6)
crates/arkflow-core/src/state/s3_backend.rs (1)
409-413: Checkpoint ID generation is racy across concurrent startersComputing
next = max(existing)+1is not atomic; can collide under concurrency or multi-process. Use an atomic counter initialized from S3, or serialize allocation. This was raised previously.+use std::sync::atomic::{AtomicU64, Ordering}; @@ pub struct S3CheckpointCoordinator { @@ checkpoint_timeout: std::time::Duration, + next_checkpoint_id: Arc<AtomicU64>, } @@ pub fn new(backend: Arc<S3StateBackend>) -> Self { Self { backend, active_checkpoints: HashMap::new(), checkpoint_timeout: std::time::Duration::from_secs(300), // 5 分钟 + next_checkpoint_id: Arc::new(AtomicU64::new(0)), } } + + async fn initialize_checkpoint_id(&self) -> Result<(), Error> { + let checkpoints = self.backend.list_checkpoints().await?; + let max_id = checkpoints.first().copied().unwrap_or(0); + self.next_checkpoint_id.store(max_id + 1, Ordering::SeqCst); + Ok(()) + } @@ pub async fn start_checkpoint(&mut self) -> Result<u64, Error> { - // Get next checkpoint ID - let checkpoints = self.backend.list_checkpoints().await?; - let checkpoint_id = checkpoints.first().map_or(1, |id| id + 1); + // Thread-safe ID generation + let current = self.next_checkpoint_id.load(Ordering::SeqCst); + if current == 0 { + self.initialize_checkpoint_id().await?; + } + let checkpoint_id = self.next_checkpoint_id.fetch_add(1, Ordering::SeqCst);Note: For multi-process safety, consider a conditional-create on a “counter” object or fencing via an S3-based CAS; otherwise add retry if metadata already exists.
crates/arkflow-core/src/stream/mod.rs (1)
651-653: Document exactly-once processor integration limitation more prominently.Add a module-level note and a TODO with an issue reference here.
crates/arkflow-core/src/state/monitoring.rs (4)
191-209: Map Prometheusregister()errors intocrate::Error.- registry.register(Box::new(operations_total.clone()))?; + registry.register(Box::new(operations_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_total: {e}")))?; - registry.register(Box::new(operations_success_total.clone()))?; + registry.register(Box::new(operations_success_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_success_total: {e}")))?; - registry.register(Box::new(operations_failed_total.clone()))?; + registry.register(Box::new(operations_failed_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operations_failed_total: {e}")))?; - registry.register(Box::new(operation_duration_seconds.clone()))?; + registry.register(Box::new(operation_duration_seconds.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_operation_duration_seconds: {e}")))?; - registry.register(Box::new(state_size_bytes.clone()))?; + registry.register(Box::new(state_size_bytes.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_size_bytes: {e}")))?; - registry.register(Box::new(checkpoint_size_bytes.clone()))?; + registry.register(Box::new(checkpoint_size_bytes.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_size_bytes: {e}")))?; - registry.register(Box::new(checkpoints_total.clone()))?; + registry.register(Box::new(checkpoints_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_total: {e}")))?; - registry.register(Box::new(checkpoint_duration_seconds.clone()))?; + registry.register(Box::new(checkpoint_duration_seconds.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoint_duration_seconds: {e}")))?; - registry.register(Box::new(checkpoint_success_total.clone()))?; + registry.register(Box::new(checkpoint_success_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_success_total: {e}")))?; - registry.register(Box::new(checkpoint_failed_total.clone()))?; + registry.register(Box::new(checkpoint_failed_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_checkpoints_failed_total: {e}")))?; - registry.register(Box::new(active_transactions.clone()))?; + registry.register(Box::new(active_transactions.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_active_transactions: {e}")))?; - registry.register(Box::new(transaction_duration_seconds.clone()))?; + registry.register(Box::new(transaction_duration_seconds.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_transaction_duration_seconds: {e}")))?; - registry.register(Box::new(cache_hits_total.clone()))?; + registry.register(Box::new(cache_hits_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_hits_total: {e}")))?; - registry.register(Box::new(cache_misses_total.clone()))?; + registry.register(Box::new(cache_misses_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_misses_total: {e}")))?; - registry.register(Box::new(cache_size_bytes.clone()))?; + registry.register(Box::new(cache_size_bytes.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_cache_size_bytes: {e}")))?; - registry.register(Box::new(s3_operations_total.clone()))?; + registry.register(Box::new(s3_operations_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operations_total: {e}")))?; - registry.register(Box::new(s3_operation_duration_seconds.clone()))?; + registry.register(Box::new(s3_operation_duration_seconds.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_operation_duration_seconds: {e}")))?; - registry.register(Box::new(s3_errors_total.clone()))?; + registry.register(Box::new(s3_errors_total.clone())) + .map_err(|e| Error::Process(format!("Failed to register arkflow_state_s3_errors_total: {e}")))?;
233-239: Add Drop semantics toOperationTimerto avoid leaks/double-counting.pub struct OperationTimer { start: Instant, metrics: Arc<StateMetrics>, operation_type: OperationType, + recorded: bool, } @@ - pub fn success(self) { + pub fn success(mut self) { let duration = self.start.elapsed().as_secs_f64(); self.metrics.operation_duration_seconds.observe(duration); self.metrics.operations_success_total.inc(); + self.recorded = true; @@ - pub fn failure(self) { + pub fn failure(mut self) { let duration = self.start.elapsed().as_secs_f64(); self.metrics.operation_duration_seconds.observe(duration); self.metrics.operations_failed_total.inc(); + self.recorded = true; @@ }Add:
impl Drop for OperationTimer { fn drop(&mut self) { if self.recorded { return; } let duration = self.start.elapsed().as_secs_f64(); self.metrics.operation_duration_seconds.observe(duration); self.metrics.operations_success_total.inc(); match self.operation_type { OperationType::Checkpoint => { self.metrics.checkpoint_duration_seconds.observe(duration); self.metrics.checkpoint_success_total.inc(); } OperationType::S3Get | OperationType::S3Put | OperationType::S3Delete => { self.metrics.s3_operation_duration_seconds.observe(duration); } OperationType::Transaction => { self.metrics.transaction_duration_seconds.observe(duration); } _ => {} } self.recorded = true; } }Also applies to: 265-305
401-407: UseEncoder::encodefor broader Prometheus compatibility.- let encoder = TextEncoder::new(); - let metric_families = self.registry.gather(); - encoder - .encode_to_string(&metric_families) - .map_err(|e| Error::Process(format!("Failed to encode metrics: {}", e))) + let encoder = TextEncoder::new(); + let metric_families = self.registry.gather(); + let mut buf = Vec::new(); + prometheus::Encoder::encode(&encoder, &metric_families, &mut buf) + .map_err(|e| Error::Process(format!("Failed to encode metrics: {e}")))?; + String::from_utf8(buf).map_err(|e| Error::Process(format!("Metrics not UTF-8: {e}")))
583-593: Strengthen timer test with assertions.#[test] fn test_operation_timer() { let (metrics, _) = StateMetrics::new().unwrap(); let metrics = Arc::new(metrics); let timer = OperationTimer::start(metrics.clone(), OperationType::Get); - drop(timer); // This should record success + drop(timer); // default success via Drop + assert_eq!(metrics.operations_total.get() as u64, 1); + assert_eq!(metrics.operations_success_total.get() as u64, 1); let timer = OperationTimer::start(metrics, OperationType::Get); timer.failure(); + assert_eq!(metrics.operations_total.get() as u64, 2); + assert_eq!(metrics.operations_failed_total.get() as u64, 1); }
🧹 Nitpick comments (16)
examples/verify_state_integration.rs (4)
80-83: Suppress unused variable warning for streams.
streamsis built but never used. This will emit a warning and might fail CI if warnings are denied.Use placeholder or underscore binding:
- let streams = engine_builder.build_streams().await?; + let _streams = engine_builder.build_streams().await?;Alternatively, if not needed at all:
- let streams = engine_builder.build_streams().await?; + let _ = engine_builder.build_streams().await?;
88-90: Iterate by reference to avoid moving the map and to match the other example.Safer and consistent whether
get_state_managers()returns an owned map or a reference.- for (operator_id, _) in state_managers { + for (operator_id, _) in state_managers.iter() { println!("State manager created for operator: {}", operator_id); }
6-7: Use HTTPS for Apache License URL.Minor polish and best practice.
- * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0
26-29: Optional: switch println! to tracing for consistency with other examples.The other example initializes
tracing_subscriberand usesinfo!. Consider aligning for consistent diagnostics.Possible change (imports and init required at top):
- println!("Testing state management integration..."); + tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init(); + tracing::info!("Testing state management integration..."); @@ - println!("Building streams with state management..."); + tracing::info!("Building streams with state management..."); @@ - println!("Created {} state managers", state_managers.len()); + tracing::info!("Created {} state managers", state_managers.len());Also applies to: 81-87
examples/test_state_management.rs (2)
27-27: Use tracing consistently for errors in spawned taskImport
errorto replaceeprintln!below.-use tracing::{info, Level}; +use tracing::{info, error, Level};
101-103: Replace eprintln! with tracing::error!Keep logging consistent and structured.
- eprintln!("流 {} 失败: {}", i, e); + error!("流 {} 失败: {}", i, e);crates/arkflow-core/src/state/s3_backend.rs (2)
67-70: Unused local_cache field
local_cacheis never read. Remove it or wire it into read-through/write-back paths.- /// 本地缓存 - local_cache: HashMap<String, SimpleMemoryState>,And corresponding initializer removal.
500-506: Metadata ‘operators’ never populatedConsider recording operator/state sizes in metadata on finalize to aid observability and cleanup decisions.
examples/test_state_config.yaml (1)
29-29: Add trailing newlineComply with linters.
- - "processed_count" + - "processed_count" +crates/arkflow-core/tests/state_management_integration.rs (2)
25-25: Remove unused imports.
sleepandDurationare unused in this test.-use tokio::time::{sleep, Duration}; +// use tokio::time::{sleep, Duration};
29-38: Prefer unique temp dirs to avoid flakiness and ensure cleanup.Use
tempfile::tempdir()(dev-dependency) to avoid collisions when tests run in parallel.- // Create a temporary input file - let temp_dir = "/tmp/arkflow_test"; - fs::create_dir_all(temp_dir).unwrap(); - let input_path = format!("{}/test_input.txt", temp_dir); + // Create a unique temporary input file + let tmp = tempfile::tempdir().unwrap(); + let input_path = tmp.path().join("test_input.txt"); - fs::write( - &input_path, + fs::write( + &input_path, "Hello World\nThis is a test\nState management test\n", ) .unwrap(); @@ - // Clean up - fs::remove_dir_all(temp_dir).ok(); + // TempDir auto-cleans on dropAlso applies to: 93-95
crates/arkflow-core/tests/comprehensive_state_test.rs (1)
171-179: Minor: avoid unnecessary type annotation.Let type inference handle
Option<String>.- let value: Option<String> = manager - .get_state_value(operator_id, &"test_key") + let value = manager + .get_state_value::<_, String>(operator_id, &"test_key") .await .unwrap();crates/arkflow-core/src/stream/mod.rs (2)
32-41:operator_idinStatefulPipelineis unused. Remove or wire it.Dead field adds noise; if needed for future state updates, pass it into state ops.
pub struct StatefulPipeline { /// 内部 pipeline inner: Arc<Pipeline>, /// 状态管理器 state_manager: Option<Arc<RwLock<EnhancedStateManager>>>, - /// 操作符 ID - operator_id: Option<String>, } @@ - operator_id: Option<String>, ) -> Self { Self { inner: pipeline, state_manager, - operator_id, } }And adjust the single call site:
- StatefulPipeline::new(pipeline_arc.clone(), Some(sm), None) + StatefulPipeline::new(pipeline_arc.clone(), Some(sm))
338-371: Avoid extra clone ofmsgunless required.If error path doesn’t need original
msg, remove.clone()to reduce allocations.- let processed = if let Some(ref stateful_pipe) = stateful_pipeline { - stateful_pipe.process(msg.clone()).await + let processed = if let Some(ref stateful_pipe) = stateful_pipeline { + stateful_pipe.process(msg).await } else { - pipeline.process(msg.clone()).await + pipeline.process(msg).await };If you need
msgfor the error branch, keep as-is.crates/arkflow-core/src/state/enhanced.rs (2)
75-79: Unused config knobs (exactly_once) not enforced.They’re defined but not used to change behavior (e.g., guarding barriers/2PC).
Consider gating 2PC/barrier paths on
exactly_once.Also applies to: 300-304
417-428:pending_transactionsis unused; either use it or remove.Currently 2PC logs but doesn’t stage data.
Also applies to: 455-497
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (20)
crates/arkflow-core/src/cli/mod.rs(1 hunks)crates/arkflow-core/src/engine/mod.rs(3 hunks)crates/arkflow-core/src/state/enhanced.rs(1 hunks)crates/arkflow-core/src/state/example.rs(1 hunks)crates/arkflow-core/src/state/helper.rs(1 hunks)crates/arkflow-core/src/state/integration_tests.rs(1 hunks)crates/arkflow-core/src/state/mod.rs(1 hunks)crates/arkflow-core/src/state/monitoring.rs(1 hunks)crates/arkflow-core/src/state/performance.rs(1 hunks)crates/arkflow-core/src/state/s3_backend.rs(1 hunks)crates/arkflow-core/src/state/simple.rs(1 hunks)crates/arkflow-core/src/state/tests.rs(1 hunks)crates/arkflow-core/src/state/transaction.rs(1 hunks)crates/arkflow-core/src/stream/mod.rs(8 hunks)crates/arkflow-core/tests/comprehensive_state_test.rs(1 hunks)crates/arkflow-core/tests/state_management_integration.rs(1 hunks)examples/data/test.txt(1 hunks)examples/test_state_config.yaml(1 hunks)examples/test_state_management.rs(1 hunks)examples/verify_state_integration.rs(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- examples/data/test.txt
🚧 Files skipped from review as they are similar to previous changes (8)
- crates/arkflow-core/src/state/integration_tests.rs
- crates/arkflow-core/src/state/tests.rs
- crates/arkflow-core/src/state/mod.rs
- crates/arkflow-core/src/state/example.rs
- crates/arkflow-core/src/state/performance.rs
- crates/arkflow-core/src/state/transaction.rs
- crates/arkflow-core/src/state/simple.rs
- crates/arkflow-core/src/state/helper.rs
🧰 Additional context used
🧬 Code graph analysis (10)
crates/arkflow-core/tests/state_management_integration.rs (3)
crates/arkflow-core/src/state/enhanced.rs (5)
write(432-453)default(82-92)new(108-139)new(342-352)new(422-429)crates/arkflow-core/src/engine/mod.rs (1)
new(82-90)crates/arkflow-core/src/state/mod.rs (1)
new(63-68)
crates/arkflow-core/src/cli/mod.rs (1)
crates/arkflow-core/src/state/example.rs (1)
config(169-171)
examples/test_state_management.rs (3)
crates/arkflow-core/src/stream/mod.rs (2)
output(437-477)state_manager(97-99)examples/verify_state_integration.rs (1)
main(27-97)crates/arkflow-core/src/state/enhanced.rs (4)
default(82-92)new(108-139)new(342-352)new(422-429)
crates/arkflow-core/tests/comprehensive_state_test.rs (5)
crates/arkflow-core/src/stream/mod.rs (3)
output(437-477)build(533-574)state_manager(97-99)crates/arkflow-core/src/lib.rs (2)
from_string(214-216)len(222-224)crates/arkflow-core/src/state/enhanced.rs (5)
new(108-139)new(342-352)new(422-429)write(432-453)default(82-92)crates/arkflow-core/src/input/mod.rs (1)
register_input_builder(120-133)crates/arkflow-core/src/output/mod.rs (1)
register_output_builder(77-90)
examples/verify_state_integration.rs (2)
examples/test_state_management.rs (1)
main(30-171)crates/arkflow-core/src/engine/mod.rs (1)
new(82-90)
crates/arkflow-core/src/engine/mod.rs (3)
crates/arkflow-core/src/stream/mod.rs (5)
new(45-55)new(128-156)process(58-84)process(689-691)process(724-726)crates/arkflow-core/src/state/enhanced.rs (1)
process(355-383)crates/arkflow-core/src/pipeline/mod.rs (1)
process(35-48)
crates/arkflow-core/src/state/s3_backend.rs (3)
crates/arkflow-core/src/state/enhanced.rs (3)
new(108-139)new(342-352)new(422-429)crates/arkflow-core/src/state/helper.rs (5)
new(42-46)get_typed(23-25)get_typed(60-72)put_typed(28-30)put_typed(74-82)crates/arkflow-core/src/state/performance.rs (6)
new(126-131)new(164-189)new(454-461)local_state(498-500)get_typed(509-514)put_typed(516-521)
crates/arkflow-core/src/stream/mod.rs (3)
crates/arkflow-core/src/pipeline/mod.rs (3)
new(30-32)process(35-48)close(51-56)crates/arkflow-core/src/state/enhanced.rs (2)
process(355-383)default(82-92)crates/arkflow-core/src/config.rs (3)
default(151-159)default(168-174)default(278-288)
crates/arkflow-core/src/state/monitoring.rs (1)
crates/arkflow-core/src/state/enhanced.rs (4)
new(108-139)new(342-352)new(422-429)default(82-92)
crates/arkflow-core/src/state/enhanced.rs (5)
crates/arkflow-core/src/state/helper.rs (1)
new(42-46)crates/arkflow-core/src/state/monitoring.rs (4)
new(68-72)new(369-377)new(497-509)inner(512-514)crates/arkflow-core/src/state/s3_backend.rs (3)
new(74-112)new(313-321)new(400-406)crates/arkflow-core/src/state/simple.rs (5)
new(36-41)new(99-104)new(157-163)process(44-74)write(107-137)crates/arkflow-core/src/stream/mod.rs (5)
inner(92-94)state_manager(97-99)process(58-84)process(689-691)process(724-726)
🪛 YAMLlint (1.37.1)
examples/test_state_config.yaml
[error] 29-29: no new line character at the end of file
(new-line-at-end-of-file)
🔇 Additional comments (4)
examples/verify_state_integration.rs (1)
33-41: Drop the typed‐field suggestion —InputConfiginarkflow-coreis defined withinput_type,name, and a genericconfigfield only; it has nopathorformatmembers. The example’s JSON blob underconfigmatches the API, so the proposed diff should be discarded.Likely an incorrect or invalid review comment.
examples/test_state_config.yaml (1)
8-8: Drop backend_type casing warning
TheStateBackendTypeenum in crates/arkflow-core/src/config.rs has#[serde(rename_all = "lowercase")], so"memory"deserializes to Memory without issue.Likely an incorrect or invalid review comment.
crates/arkflow-core/tests/comprehensive_state_test.rs (1)
204-207: Apply the same guarded registration in the second test.- // Register mock components - let _ = arkflow_core::input::register_input_builder("mock", Arc::new(MockInputBuilder)); - let _ = arkflow_core::output::register_output_builder("mock", Arc::new(MockOutputBuilder)); + register_mock_components();crates/arkflow-core/src/state/enhanced.rs (1)
268-273:current_checkpoint_id.fetch_add(1)returns the previous ID. Confirm intent.If you want monotonically increasing IDs starting at 1, this returns 1,2,…; OK. If you expect the new ID, use
fetch_add(1)+1.
| pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> { | ||
| // Initialize the logging system | ||
| let config = self.config.clone().unwrap(); | ||
| let config = self.config.clone().ok_or("No configuration loaded")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-compiling ok_or with &str into Box
ok_or("...")? can’t convert &str into Box<dyn Error>. Return a concrete error.
- let config = self.config.clone().ok_or("No configuration loaded")?;
+ let config = self
+ .config
+ .clone()
+ .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No configuration loaded"))?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let config = self.config.clone().ok_or("No configuration loaded")?; | |
| let config = self | |
| .config | |
| .clone() | |
| .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No configuration loaded"))?; |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/cli/mod.rs around line 76, the expression uses
ok_or("No configuration loaded")? which fails because a &str cannot be converted
into Box<dyn Error>; replace this with a concrete error value matching the
function's error type — e.g., return a boxed error via ok_or_else(|| Box::<dyn
std::error::Error>::from("No configuration loaded"))? or, if using anyhow in the
crate, use ok_or_else(|| anyhow::anyhow!("No configuration loaded"))? so the
Result's Err variant is a concrete error type rather than a plain &str.
| let mut streams = match engine_builder.build_streams().await { | ||
| Ok(streams) => streams, | ||
| Err(e) => { | ||
| error!("Failed to build streams: {}", e); | ||
| process::exit(1); | ||
| } | ||
| } | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid process::exit in library method; propagate error instead
Engine::run should not terminate the process. Return an error and let the caller decide.
- let mut streams = match engine_builder.build_streams().await {
- Ok(streams) => streams,
- Err(e) => {
- error!("Failed to build streams: {}", e);
- process::exit(1);
- }
- };
+ let mut streams = engine_builder
+ .build_streams()
+ .await
+ .map_err(|e| {
+ error!("Failed to build streams: {}", e);
+ std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
+ })?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut streams = match engine_builder.build_streams().await { | |
| Ok(streams) => streams, | |
| Err(e) => { | |
| error!("Failed to build streams: {}", e); | |
| process::exit(1); | |
| } | |
| } | |
| }; | |
| let mut streams = engine_builder | |
| .build_streams() | |
| .await | |
| .map_err(|e| { | |
| error!("Failed to build streams: {}", e); | |
| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) | |
| })?; |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/engine/mod.rs around lines 231 to 237, do not call
process::exit when build_streams fails; instead propagate the error to the
caller. Replace the match that logs and calls process::exit with code that
returns an Err (after logging or by using ? to propagate), and update
Engine::run's signature to return a Result type (e.g., Result<(), EngineError>
or anyhow::Error) so callers decide how to handle termination; ensure any
necessary error type conversions are applied where you return the build_streams
error.
| /// 处理事务批次 | ||
| async fn process_transactional_batch( | ||
| &mut self, | ||
| batch: MessageBatch, | ||
| tx_ctx: TransactionContext, | ||
| ) -> Result<Vec<MessageBatch>, Error> { | ||
| // 注册事务 | ||
| self.register_transaction(&tx_ctx).await?; | ||
|
|
||
| // 如果这是检查点屏障,触发检查点 | ||
| if tx_ctx.is_checkpoint() { | ||
| self.trigger_checkpoint(tx_ctx.checkpoint_id).await?; | ||
| } | ||
|
|
||
| // 处理批次(目前原样返回) | ||
| // 在实际实现中,这里会应用状态转换 | ||
| Ok(vec![batch]) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Leak: transactions are never cleaned up on error/success.
If trigger_checkpoint fails (or after success), the transaction remains in active_transactions.
async fn process_transactional_batch(
&mut self,
batch: MessageBatch,
tx_ctx: TransactionContext,
) -> Result<Vec<MessageBatch>, Error> {
// 注册事务
self.register_transaction(&tx_ctx).await?;
// 如果这是检查点屏障,触发检查点
if tx_ctx.is_checkpoint() {
- self.trigger_checkpoint(tx_ctx.checkpoint_id).await?;
+ if let Err(e) = self.trigger_checkpoint(tx_ctx.checkpoint_id).await {
+ // 清理失败事务
+ self.unregister_transaction(&tx_ctx.transaction_id).await;
+ return Err(e);
+ }
}
// 处理批次(目前原样返回)
// 在实际实现中,这里会应用状态转换
- Ok(vec![batch])
+ let result = vec![batch];
+ // 清理成功事务
+ self.unregister_transaction(&tx_ctx.transaction_id).await;
+ Ok(result)
}Add outside this hunk:
impl EnhancedStateManager {
async fn unregister_transaction(&self, tx_id: &str) {
let mut transactions = self.active_transactions.write().await;
transactions.remove(tx_id);
}
}🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/enhanced.rs around lines 162 to 179, the code
registers a transaction but never unregisters it, causing active_transactions to
leak if processing succeeds or if trigger_checkpoint fails; add the suggested
impl EnhancedStateManager::unregister_transaction(&self, tx_id: &str) (placed
outside this hunk) and call self.unregister_transaction(&tx_ctx.id).await in all
exit paths of process_transactional_batch — i.e., ensure you remove the
transaction after successful completion and also in a catch/fallback when
trigger_checkpoint or any subsequent step returns an Err (use match or a
finally-like pattern such as storing the tx_id and calling unregister in both Ok
and Err branches or via a scope guard).
| async fn list_checkpoints(&self) -> Result<Vec<u64>, Error> { | ||
| let mut checkpoints = Vec::new(); | ||
|
|
||
| // 列出检查点目录中的对象 | ||
| let mut stream = self.client.list(Some(&self.checkpoint_base_path)); | ||
|
|
||
| while let Some(object) = stream.try_next().await? { | ||
| // 从路径中提取检查点 ID | ||
| if let Some(name) = object.location.filename() { | ||
| if let Some(rest) = name.strip_prefix("chk-") { | ||
| if let Ok(id) = rest.parse::<u64>() { | ||
| checkpoints.push(id); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // 按降序排序(最新的在前) | ||
| checkpoints.sort_by(|a: &u64, b: &u64| b.cmp(a)); | ||
| Ok(checkpoints) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: list_checkpoints never finds IDs (parsing filename instead of directory segment)
filename() returns file names like _metadata.json, not chk-<id>. Result: always empty list. Parse path segments and de-duplicate IDs.
- // 列出检查点目录中的对象
- let mut stream = self.client.list(Some(&self.checkpoint_base_path));
-
- while let Some(object) = stream.try_next().await? {
- // 从路径中提取检查点 ID
- if let Some(name) = object.location.filename() {
- if let Some(rest) = name.strip_prefix("chk-") {
- if let Ok(id) = rest.parse::<u64>() {
- checkpoints.push(id);
- }
- }
- }
- }
+ // 列出检查点目录下所有对象并从路径片段解析 chk-<id>
+ let mut stream = self.client.list(Some(&self.checkpoint_base_path));
+ let mut seen = std::collections::HashSet::new();
+ while let Some(object) = stream
+ .try_next()
+ .await
+ .map_err(|e| Error::Process(format!("列举检查点失败: {}", e)))?
+ {
+ let path_str = object.location.to_string();
+ for seg in path_str.split('/') {
+ if let Some(rest) = seg.strip_prefix("chk-") {
+ if let Ok(id) = rest.parse::<u64>() {
+ if seen.insert(id) {
+ checkpoints.push(id);
+ }
+ }
+ }
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn list_checkpoints(&self) -> Result<Vec<u64>, Error> { | |
| let mut checkpoints = Vec::new(); | |
| // 列出检查点目录中的对象 | |
| let mut stream = self.client.list(Some(&self.checkpoint_base_path)); | |
| while let Some(object) = stream.try_next().await? { | |
| // 从路径中提取检查点 ID | |
| if let Some(name) = object.location.filename() { | |
| if let Some(rest) = name.strip_prefix("chk-") { | |
| if let Ok(id) = rest.parse::<u64>() { | |
| checkpoints.push(id); | |
| } | |
| } | |
| } | |
| } | |
| // 按降序排序(最新的在前) | |
| checkpoints.sort_by(|a: &u64, b: &u64| b.cmp(a)); | |
| Ok(checkpoints) | |
| } | |
| async fn list_checkpoints(&self) -> Result<Vec<u64>, Error> { | |
| let mut checkpoints = Vec::new(); | |
| // 列出检查点目录下所有对象并从路径片段解析 chk-<id> | |
| let mut stream = self.client.list(Some(&self.checkpoint_base_path)); | |
| let mut seen = std::collections::HashSet::new(); | |
| while let Some(object) = stream | |
| .try_next() | |
| .await | |
| .map_err(|e| Error::Process(format!("列举检查点失败: {}", e)))? | |
| { | |
| let path_str = object.location.to_string(); | |
| for seg in path_str.split('/') { | |
| if let Some(rest) = seg.strip_prefix("chk-") { | |
| if let Ok(id) = rest.parse::<u64>() { | |
| if seen.insert(id) { | |
| checkpoints.push(id); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| // 按降序排序(最新的在前) | |
| checkpoints.sort_by(|a: &u64, b: &u64| b.cmp(a)); | |
| Ok(checkpoints) | |
| } |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/s3_backend.rs around lines 134 to 154,
list_checkpoints currently calls object.location.filename() which yields
filenames like "_metadata.json" so no "chk-<id>" is ever found; change the logic
to iterate over the path segments of object.location (splitting by '/'), for
each segment check for the "chk-" prefix, parse the trailing part as u64,
collect parsed ids into a HashSet to de-duplicate, then convert to a Vec, sort
in descending order and return; ensure you still propagate parsing/listing
errors as before.
| Ok(result) => { | ||
| let bytes = result.bytes().await?; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Propagate object_store bytes() error with context
Current ? assumes a From<object_store::Error> for your Error. Be explicit for consistency.
- let bytes = result.bytes().await?;
+ let bytes = result
+ .bytes()
+ .await
+ .map_err(|e| Error::Process(format!("读取 S3 对象失败: {}", e)))?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Ok(result) => { | |
| let bytes = result.bytes().await?; | |
| Ok(result) => { | |
| let bytes = result | |
| .bytes() | |
| .await | |
| .map_err(|e| Error::Process(format!("读取 S3 对象失败: {}", e)))?; |
🤖 Prompt for AI Agents
In crates/arkflow-core/src/state/s3_backend.rs around lines 189–191, the call
`result.bytes().await?` currently uses `?` which assumes a
From<object_store::Error> impl; change it to explicitly map the object_store
error into your crate's Error with context (e.g.,
`result.bytes().await.map_err(|e| /* convert to your Error with context, e.g.
Error::ObjectStore{ source: e, context: "..."} */ )?`) so the
object_store::Error is propagated with clear, explicit conversion and helpful
context for debugging.
| // Register mock components | ||
| let _ = arkflow_core::input::register_input_builder("mock", Arc::new(MockInputBuilder)); | ||
| let _ = arkflow_core::output::register_output_builder("mock", Arc::new(MockOutputBuilder)); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Don’t ignore registration errors; only tolerate “already registered”.
Swallowing all errors can hide real failures.
- // Register mock components
- let _ = arkflow_core::input::register_input_builder("mock", Arc::new(MockInputBuilder));
- let _ = arkflow_core::output::register_output_builder("mock", Arc::new(MockOutputBuilder));
+ // Register mock components (idempotent)
+ register_mock_components();Add once per file:
fn register_mock_components() {
if let Err(e) = arkflow_core::input::register_input_builder("mock", Arc::new(MockInputBuilder)) {
if !format!("{e}").contains("already registered") {
panic!("register_input_builder failed: {e}");
}
}
if let Err(e) = arkflow_core::output::register_output_builder("mock", Arc::new(MockOutputBuilder)) {
if !format!("{e}").contains("already registered") {
panic!("register_output_builder failed: {e}");
}
}
}🤖 Prompt for AI Agents
In crates/arkflow-core/tests/comprehensive_state_test.rs around lines 97 to 100,
replace the current blind ignores of
register_input_builder/register_output_builder with a small helper function
(defined once in the file) named register_mock_components that calls both
registration functions, checks their Result, and panics on any Err unless the
error string contains "already registered"; then call register_mock_components()
where the registrations currently occur so real registration failures are not
swallowed.
| fs::create_dir_all(temp_dir).unwrap(); | ||
| let input_path = format!("{}/test_input.txt", temp_dir); | ||
|
|
||
| fs::write( | ||
| &input_path, | ||
| "Hello World\nThis is a test\nState management test\n", | ||
| ) | ||
| .unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing std::fs import causes compile error.
fs::create_dir_all, fs::write, and fs::remove_dir_all require use std::fs;.
use arkflow_core::stream::StreamConfig;
-use tokio::time::{sleep, Duration};
+use tokio::time::{sleep, Duration};
+use std::fs;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fs::create_dir_all(temp_dir).unwrap(); | |
| let input_path = format!("{}/test_input.txt", temp_dir); | |
| fs::write( | |
| &input_path, | |
| "Hello World\nThis is a test\nState management test\n", | |
| ) | |
| .unwrap(); | |
| // At the top of crates/arkflow-core/tests/state_management_integration.rs | |
| use arkflow_core::stream::StreamConfig; | |
| use tokio::time::{sleep, Duration}; | |
| use std::fs; |
🤖 Prompt for AI Agents
In crates/arkflow-core/tests/state_management_integration.rs around lines 31 to
38, the test uses fs::create_dir_all, fs::write, and fs::remove_dir_all but
never imports std::fs; add a top-of-file import "use std::fs;" (or prefix calls
with std::fs::) so the filesystem functions resolve and the test compiles.
| input: InputConfig { | ||
| r#type: "file".to_string(), | ||
| path: Some("./examples/data/test.txt".to_string()), | ||
| format: Some("text".to_string()), | ||
| ..Default::default() | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Potential API mismatch: InputConfig fields likely invalid
This example uses r#type, path, and format, whereas other examples use input_type and config (serde_json). This may not compile if InputConfig doesn’t expose these fields.
Run to verify current InputConfig shape and update the example accordingly:
🏁 Script executed:
#!/bin/bash
rg -nP --type=rust -C3 'struct\s+InputConfig\b'
rg -nP --type=rust -C2 '\bInputConfig\s*\{'
rg -nP --type=rust -C2 '\binput_type\b|\br#?type\b|\bpath\b|\bformat\b|\bconfig\b'Length of output: 263655
Match InputConfig fields to core definition
Replace the use of r#type, path, and format with input_type, name, and config per arkflow-core’s InputConfig (in crates/arkflow-core/src/input/mod.rs), for example in examples/test_state_management.rs:39–44:
- input: InputConfig {
- r#type: "file".to_string(),
- path: Some("./examples/data/test.txt".to_string()),
- format: Some("text".to_string()),
- ..Default::default()
- },
+ input: InputConfig {
+ input_type: "file".to_string(),
+ name: None,
+ config: Some(serde_json::json!({
+ "path": "./examples/data/test.txt",
+ "format": "text"
+ })),
+ ..Default::default()
+ },📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| input: InputConfig { | |
| r#type: "file".to_string(), | |
| path: Some("./examples/data/test.txt".to_string()), | |
| format: Some("text".to_string()), | |
| ..Default::default() | |
| }, | |
| input: InputConfig { | |
| input_type: "file".to_string(), | |
| name: None, | |
| config: Some(serde_json::json!({ | |
| "path": "./examples/data/test.txt", | |
| "format": "text" | |
| })), | |
| ..Default::default() | |
| }, |
🤖 Prompt for AI Agents
In examples/test_state_management.rs around lines 39 to 44, the InputConfig
struct uses outdated field names (r#type, path, format); update to match
arkflow-core's InputConfig by replacing r#type with input_type, path with name,
and format moved into the config field (e.g., set config to a map/JSON
containing format=>"text" or equivalent), and keep the rest of the defaults via
..Default::default().
| for handle in handles { | ||
| handle.await?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JoinHandle error not convertible to arkflow_core::Error
handle.await? requires From<JoinError> for Error; likely missing. Map join errors explicitly.
- for handle in handles {
- handle.await?;
- }
+ for handle in handles {
+ if let Err(e) = handle.await {
+ return Err(Error::Process(format!("流任务 Join 失败: {}", e)));
+ }
+ }🤖 Prompt for AI Agents
In examples/test_state_management.rs around lines 157 to 159, awaiting the
JoinHandle uses the `?` operator but `tokio::task::JoinError` does not
automatically convert into `arkflow_core::Error`; change to explicitly map the
join error into an arkflow_core::Error (for example using `.await.map_err(|e|
arkflow_core::Error::... )?`) or match the result and return/convert a
descriptive arkflow_core::Error variant so the compiler has a concrete
conversion path.
| output: OutputConfig { | ||
| output_type: "stdout".to_string(), | ||
| name: None, | ||
| config: None, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OutputConfig field name likely mismatched (output_type vs r#type).
For consistency with the other example and likely with the current type, prefer r#type and default-initialize the rest.
Apply:
output: OutputConfig {
- output_type: "stdout".to_string(),
- name: None,
- config: None,
+ r#type: "stdout".to_string(),
+ ..Default::default()
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| output: OutputConfig { | |
| output_type: "stdout".to_string(), | |
| name: None, | |
| config: None, | |
| }, | |
| output: OutputConfig { | |
| r#type: "stdout".to_string(), | |
| ..Default::default() | |
| }, |
🤖 Prompt for AI Agents
In examples/verify_state_integration.rs around lines 45 to 49 the OutputConfig
struct uses output_type which likely mismatches the struct field r#type used
elsewhere; change the field name to r#type and set the other fields to default
(e.g., name: None, config: None) or use Default::default() for the whole
OutputConfig as appropriate so the field names align with the type definition
and other examples.
Summary by CodeRabbit