Skip to content

Conversation

@chenquan
Copy link
Collaborator

@chenquan chenquan commented Jul 7, 2025

Summary by CodeRabbit

  • New Features

    • Introduced support for processing message batches using custom WebAssembly modules.
    • Added configuration options to specify WASM modules and functions for processing.
    • Enabled asynchronous execution of WASM-based processors within the plugin framework.
  • Bug Fixes

    • Improved error handling for invalid or missing WASM processor configurations.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 7, 2025

Walkthrough

A new WebAssembly (WASM) processor has been introduced to the arkflow-plugin crate. This includes adding the wasmtime dependency, integrating a new wasm module into the processor initialization, and implementing the WASM processor logic to enable asynchronous processing of message batches via user-specified WASM modules.

Changes

File(s) Change Summary
crates/arkflow-plugin/Cargo.toml Added wasmtime crate dependency for WASM support.
crates/arkflow-plugin/src/processor/mod.rs Declared new public wasm module; updated init to initialize the wasm module.
crates/arkflow-plugin/src/processor/wasm.rs Implemented WASM processor: config struct, processor logic, builder, error handling, and tests.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant ArkflowProcessor
    participant WasmProcessor
    participant WasmtimeEngine
    participant WasmModule

    User->>ArkflowProcessor: Initialize
    ArkflowProcessor->>WasmProcessor: init()
    WasmProcessor->>WasmtimeEngine: Load WASM module (file or base64)
    WasmtimeEngine->>WasmModule: Instantiate

    User->>ArkflowProcessor: process(MessageBatch)
    ArkflowProcessor->>WasmProcessor: process(MessageBatch)
    WasmProcessor->>WasmModule: Call function with input data
    WasmModule-->>WasmProcessor: Return output pointer
    WasmProcessor->>WasmModule: Read output bytes
    WasmProcessor-->>ArkflowProcessor: Return processed MessageBatch
    ArkflowProcessor-->>User: Return processed result
Loading

Possibly related PRs

  • feat(processor): supports python #409: Adds a Python-based processor module and integrates it similarly to this WASM processor, modifying the same processor module and initialization function but implementing a different processor type.

Poem

In the land of Arkflow, a new path appears,
WASM hops in, greeted with cheers!
Now plugins can run, bytecode in tow,
Processing messages with a swift, nimble flow.
With Wasmtime’s might and a bunny’s delight,
The future of plugins is shiny and bright! 🐇✨

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Clippy (1.86.0)
Updating crates.io index

warning: failed to write cache, path: /usr/local/registry/index/index.crates.io-1949cf8c6b5b557f/.cache/ar/ro/arrow-json, error: Permission denied (os error 13)
Downloading crates ...
Downloaded aes-siv v0.7.0
error: failed to create directory /usr/local/registry/cache/index.crates.io-1949cf8c6b5b557f

Caused by:
Permission denied (os error 13)

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (1)
crates/arkflow-plugin/src/processor/wasm.rs (1)

55-58: Consider security and performance implications of WASM execution.

Creating a new Store for each execution provides good isolation but may have performance implications for high-throughput scenarios. Additionally, running untrusted WASM code requires careful consideration of security boundaries.

Consider:

  1. Store Pooling: For better performance, implement a pool of pre-initialized Stores that can be reused.
  2. Fuel Metering: Use Wasmtime's fuel consumption feature to prevent infinite loops.
  3. Resource Limits: Configure memory and table limits in the Engine.
  4. Sandboxing: Ensure WASM modules cannot access host filesystem or network unless explicitly allowed.

Example configuration:

let mut config = wasmtime::Config::new();
config.consume_fuel(true);
config.memory_maximum_size(16 * 1024 * 1024); // 16MB max
let engine = Engine::new(&config)?;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c4ce15 and f1e3eb8.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • crates/arkflow-plugin/Cargo.toml (1 hunks)
  • crates/arkflow-plugin/src/processor/mod.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/wasm.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/processor/mod.rs (6)
crates/arkflow-plugin/src/processor/wasm.rs (1)
  • init (27-30)
crates/arkflow-plugin/src/processor/sql.rs (1)
  • init (247-249)
crates/arkflow-plugin/src/processor/protobuf.rs (1)
  • init (230-240)
crates/arkflow-plugin/src/processor/python.rs (1)
  • init (166-168)
crates/arkflow-plugin/src/processor/json.rs (1)
  • init (147-151)
crates/arkflow-plugin/src/processor/vrl.rs (1)
  • init (24-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build
  • GitHub Check: build
  • GitHub Check: Build Docusaurus
🔇 Additional comments (2)
crates/arkflow-plugin/Cargo.toml (1)

89-91: Confirm wasmtime 34.0.1 is up to date and secure

Version 34.0.1 is the latest stable wasmtime release (June 24, 2025) and includes a fix for a host-defined tables/globals panic. No other security vulnerabilities are reported—no changes needed.

crates/arkflow-plugin/src/processor/mod.rs (1)

27-27: LGTM!

The WASM module declaration and initialization follow the established pattern consistently.

Also applies to: 36-36

Comment on lines +223 to +257
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;

#[tokio::test]
async fn test_wasm_processor_missing_config() {
let result = WasmProcessorBuilder.build(
None,
&None,
&Resource {
temporary: Default::default(),
input_names: RefCell::new(Default::default()),
},
);
assert!(result.is_err());
}

#[tokio::test]
async fn test_wasm_processor_invalid_config() {
let config = serde_json::json!({
"invalid": "config"
});

let result = WasmProcessorBuilder.build(
None,
&Some(config),
&Resource {
temporary: Default::default(),
input_names: RefCell::new(Default::default()),
},
);
assert!(result.is_err());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add comprehensive tests for WASM processing functionality.

The current tests only cover configuration errors. Consider adding tests for actual WASM execution, memory management, and edge cases.

Would you like me to generate comprehensive test cases including:

  • A simple WASM module for testing
  • Tests for successful processing
  • Tests for WASM execution errors
  • Tests for memory allocation/deallocation
  • Tests for timeout handling (if implemented)
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs between lines 223 and 257, the
existing tests only check for configuration errors. To improve coverage, add
tests that include a simple WASM module to verify successful processing, tests
that simulate WASM execution errors, tests that validate proper memory
allocation and deallocation, and if applicable, tests that check timeout
handling during WASM execution. This will ensure the WASM processor behaves
correctly under various scenarios beyond just config validation.

Comment on lines 32 to 41
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WasmProcessorConfig {
/// WASM module path or inline WASM bytes (base64 encoded)
module: String,
/// Function name to call in the WASM module
function: String,
/// Whether the module field contains a file path or base64 encoded bytes
#[serde(default = "default_is_file")]
is_file: bool,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding security and resource limit configurations.

The current configuration lacks important security and resource management options for running untrusted WASM code safely.

 #[derive(Debug, Clone, Serialize, Deserialize)]
 struct WasmProcessorConfig {
     /// WASM module path or inline WASM bytes (base64 encoded)
     module: String,
     /// Function name to call in the WASM module
     function: String,
     /// Whether the module field contains a file path or base64 encoded bytes
     #[serde(default = "default_is_file")]
     is_file: bool,
+    /// Maximum memory pages (64KB each) the WASM module can use
+    #[serde(default = "default_max_memory_pages")]
+    max_memory_pages: u32,
+    /// Timeout for WASM execution in milliseconds
+    #[serde(default = "default_timeout_ms")]
+    timeout_ms: u64,
 }

Add corresponding default functions:

fn default_max_memory_pages() -> u32 {
    256 // 16MB default limit
}

fn default_timeout_ms() -> u64 {
    5000 // 5 seconds default timeout
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 32 to 41, the
WasmProcessorConfig struct lacks security and resource limit fields for safely
running untrusted WASM code. Add new fields max_memory_pages of type u32 and
timeout_ms of type u64 with serde default attributes pointing to new default
functions default_max_memory_pages and default_timeout_ms respectively.
Implement these default functions to return 256 for max_memory_pages (16MB
limit) and 5000 for timeout_ms (5 seconds timeout) as specified.

Comment on lines +64 to +71
let func = instance
.get_typed_func::<(i32, i32), i32>(&mut store, &self.config.function)
.map_err(|e| {
Error::Process(format!(
"Failed to get function '{}': {}",
self.config.function, e
))
})?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Document WASM module interface requirements.

The hardcoded function signature and required exports should be clearly documented for users implementing WASM modules.

Add comprehensive documentation above the struct:

/// WasmProcessor executes WebAssembly modules for message processing.
/// 
/// # WASM Module Requirements
/// 
/// The WASM module must export:
/// - `memory`: WebAssembly memory for data exchange
/// - `alloc(size: i32) -> i32`: Allocate memory and return pointer
/// - `dealloc(ptr: i32, size: i32)`: Free allocated memory (optional but recommended)
/// - Processing function with signature `(ptr: i32, len: i32) -> i32`
/// 
/// # Data Format
/// 
/// Input: MessageBatch serialized as JSON bytes
/// Output: Length-prefixed byte array (4-byte LE length + data)
/// 
/// # Example WASM Module (Rust)
/// ```ignore
/// #[no_mangle]
/// pub extern "C" fn alloc(size: i32) -> i32 {
///     // Allocate memory
/// }
/// 
/// #[no_mangle]
/// pub extern "C" fn process(ptr: i32, len: i32) -> i32 {
///     // Process data and return output pointer
/// }
/// ```
struct WasmProcessor {
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 64 to 71, add
detailed documentation above the WasmProcessor struct to clearly specify the
WASM module interface requirements. Document the expected exports such as
memory, alloc, dealloc, and the processing function with the signature (ptr:
i32, len: i32) -> i32. Also include the data format for input and output, and
provide an example snippet illustrating the required functions in Rust. This
will help users understand how to implement compatible WASM modules.

Comment on lines +149 to +154
fn message_batch_to_bytes(&self, msg_batch: &MessageBatch) -> Result<Vec<u8>, Error> {
// Convert MessageBatch to JSON for WASM processing
// This is a simplified approach - in practice you might want to use a more efficient format
let json_data = msg_batch.to_binary("value")?;
Ok(json_data.join(&b'\n'))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve data serialization for production use.

The current implementation uses a simplified approach that may not preserve all MessageBatch properties effectively.

Consider using a more robust serialization format:

     fn message_batch_to_bytes(&self, msg_batch: &MessageBatch) -> Result<Vec<u8>, Error> {
-        // Convert MessageBatch to JSON for WASM processing
-        // This is a simplified approach - in practice you might want to use a more efficient format
-        let json_data = msg_batch.to_binary("value")?;
-        Ok(json_data.join(&b'\n'))
+        // Serialize MessageBatch to JSON, preserving schema and metadata
+        let json_value = serde_json::to_value(msg_batch)
+            .map_err(|e| Error::Process(format!("Failed to serialize MessageBatch: {}", e)))?;
+        serde_json::to_vec(&json_value)
+            .map_err(|e| Error::Process(format!("Failed to convert to bytes: {}", e)))
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn message_batch_to_bytes(&self, msg_batch: &MessageBatch) -> Result<Vec<u8>, Error> {
// Convert MessageBatch to JSON for WASM processing
// This is a simplified approach - in practice you might want to use a more efficient format
let json_data = msg_batch.to_binary("value")?;
Ok(json_data.join(&b'\n'))
}
fn message_batch_to_bytes(&self, msg_batch: &MessageBatch) -> Result<Vec<u8>, Error> {
// Serialize MessageBatch to JSON, preserving schema and metadata
let json_value = serde_json::to_value(msg_batch)
.map_err(|e| Error::Process(format!("Failed to serialize MessageBatch: {}", e)))?;
serde_json::to_vec(&json_value)
.map_err(|e| Error::Process(format!("Failed to convert to bytes: {}", e)))
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 149 to 154, the
current method serializes MessageBatch to JSON and joins with newline
characters, which may not preserve all properties or be efficient for
production. Replace this with a more robust serialization format such as
MessagePack, Protobuf, or CBOR that fully captures the MessageBatch structure
and is optimized for performance. Update the method to serialize using the
chosen format and return the resulting byte vector accordingly.

Comment on lines +81 to +91
// Allocate function to get memory for input
let alloc_func = instance
.get_typed_func::<i32, i32>(&mut store, "alloc")
.map_err(|e| {
Error::Process(format!("WASM module must export 'alloc' function: {}", e))
})?;

// Allocate memory for input data
let input_ptr = alloc_func
.call(&mut store, input_data.len() as i32)
.map_err(|e| Error::Process(format!("Failed to allocate memory: {}", e)))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Memory leak - allocated memory is never freed.

The code allocates memory in the WASM module but never deallocates it, leading to memory leaks with repeated invocations.

After processing, you should call a corresponding dealloc or free function:

         // Write input data to WASM memory
         memory
             .write(&mut store, input_ptr as usize, &input_data)
             .map_err(|e| Error::Process(format!("Failed to write input data: {}", e)))?;
 
         // Call the processing function
         let output_ptr = func
             .call(&mut store, (input_ptr, input_data.len() as i32))
             .map_err(|e| Error::Process(format!("WASM function call failed: {}", e)))?;
 
         // Read output data from WASM memory
         let output_data = self.read_output_from_memory(&mut store, &memory, output_ptr)?;
+
+        // Free allocated input memory
+        if let Ok(dealloc_func) = instance.get_typed_func::<(i32, i32), ()>(&mut store, "dealloc") {
+            dealloc_func
+                .call(&mut store, (input_ptr, input_data.len() as i32))
+                .map_err(|e| Error::Process(format!("Failed to deallocate memory: {}", e)))?;
+        }
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 81 to 91, the code
allocates memory in the WASM module using the 'alloc' function but does not free
it afterward, causing a memory leak. To fix this, retrieve the corresponding
'dealloc' or 'free' function from the WASM instance, then after processing the
input data, call this function with the pointer and length of the allocated
memory to properly release it.

Comment on lines +179 to +197
fn bytes_to_message_batch(
&self,
data: Vec<u8>,
original: &MessageBatch,
) -> Result<MessageBatch, Error> {
// Convert processed bytes back to MessageBatch
// This assumes the WASM function returns JSON data
if data.is_empty() {
return Ok(MessageBatch::new_binary(vec![]).unwrap());
}

// Try to parse as JSON and create new MessageBatch
let json_str = String::from_utf8(data)
.map_err(|e| Error::Process(format!("Invalid UTF-8 in WASM output: {}", e)))?;

// For simplicity, return as binary data
// In practice, you might want to parse JSON and convert back to Arrow format
Ok(original.new_binary_with_origin(vec![json_str.into_bytes()])?)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Complete the data deserialization implementation.

The current implementation doesn't properly deserialize the processed data back to MessageBatch format.

Implement proper deserialization:

     fn bytes_to_message_batch(
         &self,
         data: Vec<u8>,
         original: &MessageBatch,
     ) -> Result<MessageBatch, Error> {
-        // Convert processed bytes back to MessageBatch
-        // This assumes the WASM function returns JSON data
-        if data.is_empty() {
-            return Ok(MessageBatch::new_binary(vec![]).unwrap());
-        }
-
-        // Try to parse as JSON and create new MessageBatch
-        let json_str = String::from_utf8(data)
-            .map_err(|e| Error::Process(format!("Invalid UTF-8 in WASM output: {}", e)))?;
-
-        // For simplicity, return as binary data
-        // In practice, you might want to parse JSON and convert back to Arrow format
-        Ok(original.new_binary_with_origin(vec![json_str.into_bytes()])?)
+        if data.is_empty() {
+            return Ok(MessageBatch::new_binary(vec![]).unwrap());
+        }
+
+        // Deserialize JSON data back to MessageBatch
+        let output_batch: MessageBatch = serde_json::from_slice(&data)
+            .map_err(|e| Error::Process(format!("Failed to deserialize WASM output: {}", e)))?;
+        
+        Ok(output_batch)
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn bytes_to_message_batch(
&self,
data: Vec<u8>,
original: &MessageBatch,
) -> Result<MessageBatch, Error> {
// Convert processed bytes back to MessageBatch
// This assumes the WASM function returns JSON data
if data.is_empty() {
return Ok(MessageBatch::new_binary(vec![]).unwrap());
}
// Try to parse as JSON and create new MessageBatch
let json_str = String::from_utf8(data)
.map_err(|e| Error::Process(format!("Invalid UTF-8 in WASM output: {}", e)))?;
// For simplicity, return as binary data
// In practice, you might want to parse JSON and convert back to Arrow format
Ok(original.new_binary_with_origin(vec![json_str.into_bytes()])?)
}
fn bytes_to_message_batch(
&self,
data: Vec<u8>,
original: &MessageBatch,
) -> Result<MessageBatch, Error> {
if data.is_empty() {
return Ok(MessageBatch::new_binary(vec![]).unwrap());
}
// Deserialize JSON data back to MessageBatch
let output_batch: MessageBatch = serde_json::from_slice(&data)
.map_err(|e| Error::Process(format!("Failed to deserialize WASM output: {}", e)))?;
Ok(output_batch)
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs between lines 179 and 197, the
bytes_to_message_batch function currently does not deserialize the processed
data correctly back into a MessageBatch. To fix this, replace the placeholder
JSON string handling with actual deserialization logic that converts the input
bytes into the appropriate MessageBatch structure, likely by parsing the JSON or
binary format expected and reconstructing the MessageBatch accordingly, ensuring
the returned MessageBatch accurately represents the processed data.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (5)
crates/arkflow-plugin/src/processor/wasm.rs (5)

35-39: Document WASM module interface requirements.


69-79: Critical: Memory leak - allocated memory is never freed.


130-135: Improve data serialization for production use.


160-178: Complete the data deserialization implementation.


209-243: Add comprehensive tests for WASM processing functionality.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f1e3eb8 and d02c375.

📒 Files selected for processing (1)
  • crates/arkflow-plugin/src/processor/wasm.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/processor/wasm.rs (3)
crates/arkflow-core/src/processor/mod.rs (1)
  • register_processor_builder (75-88)
crates/arkflow-core/src/lib.rs (2)
  • len (203-205)
  • new_binary (95-97)
crates/arkflow-plugin/src/processor/mod.rs (1)
  • init (29-38)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build
  • GitHub Check: build
  • GitHub Check: Build Docusaurus
🔇 Additional comments (1)
crates/arkflow-plugin/src/processor/wasm.rs (1)

181-202: LGTM - Well-implemented builder pattern.

The builder correctly validates configuration presence, deserializes the config, and handles errors appropriately.

Comment on lines +111 to +117
let module_data = // Load from file
std::fs::read(&config.module).map_err(|e| {
Error::Config(format!(
"Failed to read WASM file '{}': {}",
config.module, e
))
})?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement base64 decoding support or update documentation.

The constructor only handles file reading, but the configuration documentation suggests support for base64-encoded WASM bytes. Either implement the base64 decoding functionality or update the documentation to reflect current capabilities.

To support base64 decoding, update the constructor:

         // Load WASM module
-        let module_data =   // Load from file
-            std::fs::read(&config.module).map_err(|e| {
-                Error::Config(format!(
-                    "Failed to read WASM file '{}': {}",
-                    config.module, e
-                ))
-            })?;
+        let module_data = if config.is_file {
+            // Load from file
+            std::fs::read(&config.module).map_err(|e| {
+                Error::Config(format!(
+                    "Failed to read WASM file '{}': {}",
+                    config.module, e
+                ))
+            })?
+        } else {
+            // Decode from base64
+            base64::engine::general_purpose::STANDARD
+                .decode(&config.module)
+                .map_err(|e| {
+                    Error::Config(format!(
+                        "Failed to decode base64 WASM module: {}",
+                        e
+                    ))
+                })?
+        };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let module_data = // Load from file
std::fs::read(&config.module).map_err(|e| {
Error::Config(format!(
"Failed to read WASM file '{}': {}",
config.module, e
))
})?;
// Load WASM module
let module_data = if config.is_file {
// Load from file
std::fs::read(&config.module).map_err(|e| {
Error::Config(format!(
"Failed to read WASM file '{}': {}",
config.module, e
))
})?
} else {
// Decode from base64
base64::engine::general_purpose::STANDARD
.decode(&config.module)
.map_err(|e| {
Error::Config(format!(
"Failed to decode base64 WASM module: {}",
e
))
})?
};
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 111 to 117, the code
only reads the WASM module from a file, but the documentation suggests it should
also support base64-encoded WASM bytes. To fix this, modify the constructor to
detect if the input is base64-encoded and decode it accordingly before use.
Alternatively, if base64 support is not intended, update the documentation to
remove or clarify the base64 support claim.

Comment on lines +27 to +33
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WasmProcessorConfig {
/// WASM module path or inline WASM bytes (base64 encoded)
module: String,
/// Function name to call in the WASM module
function: String,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix configuration inconsistency between documentation and implementation.

The documentation mentions "inline WASM bytes (base64 encoded)" but the struct lacks the is_file field to distinguish between file paths and base64-encoded content. The constructor only handles file reading.

Either add the missing is_file field or update the documentation to match the current implementation:

 #[derive(Debug, Clone, Serialize, Deserialize)]
 struct WasmProcessorConfig {
     /// WASM module path or inline WASM bytes (base64 encoded)
     module: String,
     /// Function name to call in the WASM module
     function: String,
+    /// Whether the module field contains a file path or base64 encoded bytes
+    #[serde(default = "default_is_file")]
+    is_file: bool,
 }
+
+fn default_is_file() -> bool {
+    true
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WasmProcessorConfig {
/// WASM module path or inline WASM bytes (base64 encoded)
module: String,
/// Function name to call in the WASM module
function: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WasmProcessorConfig {
/// WASM module path or inline WASM bytes (base64 encoded)
module: String,
/// Function name to call in the WASM module
function: String,
/// Whether the module field contains a file path or base64 encoded bytes
#[serde(default = "default_is_file")]
is_file: bool,
}
fn default_is_file() -> bool {
true
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 27 to 33, the
WasmProcessorConfig struct documentation mentions supporting both file paths and
inline base64-encoded WASM bytes, but the struct lacks an is_file field to
distinguish these cases and the constructor only handles file reading. To fix
this, either add a boolean is_file field to the struct and update the
constructor to handle both file reading and base64 decoding based on this flag,
or revise the documentation comment to only mention file paths to align with the
current implementation.

@chenquan chenquan self-assigned this Jul 7, 2025
@chenquan chenquan marked this pull request as draft July 7, 2025 13:47
@chenquan chenquan added the hold Pause, for some reason label Jul 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

hold Pause, for some reason

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants