Skip to content

feat: Add JNI-based Hadoop FileSystem support for S3 and other Hadoop-compatible stores #1992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/JniHDFSBridge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.parquet;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public final class JniHDFSBridge {
/**
* Reads a byte range from a file using Hadoop FileSystem API.
*
* @param path The file path to read from
* @param configs Configuration properties for the filesystem
* @param offset Starting byte position (0-based)
* @param len Number of bytes to read
* @return Byte array containing the read data, or null if error occurs
* @throws IllegalArgumentException If parameters are invalid
*/
public static byte[] read(String path, Map<String, String> configs, long offset, int len) {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("Path cannot be null or empty");
}
if (offset < 0) {
throw new IllegalArgumentException("Offset cannot be negative");
}
if (len < 0) {
throw new IllegalArgumentException("Length cannot be negative");
}

try {
Path p = new Path(path);
Configuration conf = new Configuration();

// Set configurations if provided
if (configs != null) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}
FileSystem fs = p.getFileSystem(conf);

long fileLen = fs.getFileStatus(p).getLen();

if (offset > fileLen) {
throw new IOException(
"Offset beyond file length: offset=" + offset + ", fileLen=" + fileLen);
}

if (len == 0) {
return new byte[0];
}

// Adjust length if it exceeds remaining bytes
if (offset + len > fileLen) {
len = (int) (fileLen - offset);
if (len <= 0) {
return new byte[0];
}
}

FSDataInputStream inputStream = fs.open(p);
inputStream.seek(offset);
byte[] buffer = new byte[len];
int totalBytesRead = 0;
while (totalBytesRead < len) {
int read = inputStream.read(buffer, totalBytesRead, len - totalBytesRead);
if (read == -1) break;
totalBytesRead += read;
}
inputStream.close();

return totalBytesRead < len ? Arrays.copyOf(buffer, totalBytesRead) : buffer;
} catch (Exception e) {
System.err.println("Native.read failed: " + e);
return null;
}
}

/**
* Gets the length of a file using Hadoop FileSystem API.
*
* @param path The file path to check
* @param configs Configuration properties for the filesystem
* @return File length in bytes, or -1 if the file doesn't exist
* @throws IllegalArgumentException If path is invalid or configs contain invalid values
*/
public static long getLength(String path, Map<String, String> configs) {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("Path cannot be null or empty");
}

try {
Path p = new Path(path);
Configuration conf = new Configuration();
if (configs != null) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}

FileSystem fs = p.getFileSystem(conf);
return fs.getFileStatus(p).getLen();
} catch (Exception e) {
System.err.println("Native.getLength failed: " + e);
return -1;
}
}
}
9 changes: 9 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ object CometConf extends ShimCometConf {

val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";

val COMET_USE_JNI_OBJECT_STORE: ConfigEntry[Boolean] =
conf("spark.comet.use_jni_object_store")
.doc(
"If enabled, Comet will access Hadoop-compatible file systems using the Hadoop FileSystem" +
" API via JNI, bypassing the native Rust object store implementations.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.doc(
"Whether to enable Comet extension for Spark. When this is turned on, Spark will use " +
Expand Down
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ publish = false

[dependencies]
arrow = { workspace = true }
chrono = { workspace = true }
parquet = { workspace = true, default-features = false, features = ["experimental"] }
futures = { workspace = true }
mimalloc = { version = "*", default-features = false, optional = true }
Expand Down
3 changes: 3 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use crate::execution::spark_plan::SparkPlan;

use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace};

use crate::parquet::objectstore::jni_hdfs::init_jvm;
use datafusion_comet_proto::spark_operator::operator::OpStruct;
use log::info;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -166,6 +167,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
init_jvm(&env);

// Init JVM classes
JVMClasses::init(&mut env);

Expand Down
2 changes: 1 addition & 1 deletion native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub mod parquet_support;
pub mod read;
pub mod schema_adapter;

mod objectstore;
pub mod objectstore;

use std::collections::HashMap;
use std::task::Poll;
Expand Down
Loading
Loading