-
Notifications
You must be signed in to change notification settings - Fork 225
feat: Add JNI-based Hadoop FileSystem support for S3 and other Hadoop-compatible stores #1992
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1992 +/- ##
============================================
+ Coverage 56.12% 58.01% +1.88%
- Complexity 976 1152 +176
============================================
Files 119 134 +15
Lines 11743 13095 +1352
Branches 2251 2432 +181
============================================
+ Hits 6591 7597 +1006
- Misses 4012 4264 +252
- Partials 1140 1234 +94 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @drexler-sky for this PR! I've been working on something similar and there is overlap as well as some differences in our respective methods. In particular I am looking at how to integrate with the native_iceberg_compat
mode, as well as reusing the SeekableInputStream
that native_iceberg_compat` already has.
I'll put my code in github in a day or so and we can discuss how to get to an implementation that works for both paths.
@@ -292,4 +299,104 @@ public static native void currentColumnBatch( | |||
* @param handle | |||
*/ | |||
public static native void closeRecordBatchReader(long handle); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move these to a new file, say JniHDFSBridge
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed.
native/core/Cargo.toml
Outdated
@@ -50,6 +50,7 @@ lazy_static = "1.4.0" | |||
prost = "0.13.5" | |||
jni = "0.21" | |||
snap = "1.1" | |||
chrono = { version = "0.4", default-features = false, features = ["clock"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use the dependency define in the main Cargo.toml (i.e. make this a workspace dependency)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
/// # Returns | ||
/// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` | ||
/// if the operation fails. | ||
pub fn call_get_length( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just get_length
perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to get_length
/// # Returns | ||
/// Returns `Ok(Vec<u8>)` containing the requested bytes on success, or an | ||
/// `ObjectStoreError` if the operation fails. | ||
pub fn call_read( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just read
or maybe read_range
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to read
&self, | ||
_prefix: Option<&Path>, | ||
) -> BoxStream<'static, Result<ObjectMeta, ObjectStoreError>> { | ||
futures::stream::empty().boxed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you intend to return an empty stream here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to todo!()
@@ -384,6 +392,17 @@ pub(crate) fn prepare_object_store_with_configs( | |||
|
|||
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" { | |||
parse_hdfs_url(&url) | |||
} else if scheme == "s3" && use_jni { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is a tricky bit. If use_jni
is true we should be able to use any file system available to hadoop. We really need to scan the config for all spark.hadoop.fs.customfs.impl
and register the jni based object store for every customfs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I updated the code to scan for spark.hadoop.fs.<scheme>.impl
configs and use JniObjectStore
for any matching scheme when use_jni
is true.
@@ -0,0 +1,332 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name this jni_hdfs.rs since this is a jni based implementation of an hdfs object store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the changes @drexler-sky. I did a more detailed pass and have some more comments.
static JVM: OnceCell<JavaVM> = OnceCell::new(); | ||
|
||
pub fn init_jvm(env: &JNIEnv) { | ||
let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM")); | ||
} | ||
|
||
fn get_jni_env<'a>() -> jni::AttachGuard<'a> { | ||
JVM.get() | ||
.expect("JVM not initialized") | ||
.attach_current_thread() | ||
.expect("Failed to attach thread") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already executing this code in a JVM and the JNIEnv
is available to us in (jni_api.rs) Java_org_apache_comet_Native_executePlan
We could probably pass that in all the way here.
Ok(jmap) | ||
} | ||
|
||
pub fn jni_error(e: jni::errors::Error) -> ObjectStoreError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jvm_bridge/mod.rs
has a jni_map_error
macro. It also has the very useful jni_call
and jni_static_call
. Perhaps we can use those and make the code consistent?
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" { | ||
parse_hdfs_url(&url) | ||
} else if use_jni && hadoop_schemes.contains(scheme) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check should come first, even before the s3a scheme has been renamed to s3. Note that a user could override the implementation to be used for the hdfs
scheme.
Either way, if the use_jni flag
is set, we should use jni if the scheme is hdfs
or if the config specifies an implementation for the scheme.
Also , we can rename s3a to s3 only if use_jni
is false; this way we won't be renaminbg s3a back and forth.
|
||
test("Comet uses JNI object store when use_jni is true") { | ||
spark.conf.set("spark.comet.use_jni_object_store", "true") | ||
runParquetScanAndAssert() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we can verify the jni implementation did get called?
@comphead @Kontinuation you might be interested in looking at this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @drexler-sky for the PR
Please help me to understand how this object store is different from https://github.com/apache/datafusion-comet/blob/main/native/hdfs/src/object_store/hdfs.rs ?
I took a look at the
BTW, is there any concern enabling hdfs support by default and switching the default fs-hdfs dependency to datafusion-comet/native/hdfs/Cargo.toml Lines 34 to 37 in d885f4a
|
Thanks @Kontinuation I was about to create a PR to enable hdfs support by default @kazuyukitanimura cc for Reg to configuring the Hadoop Client from Rust side I used the command line
So the Rust was able to get the Hadoop client configurations. But if it could be improved feel free to extend the |
Thank you for the comments. That all makes sense to me. Here’s the plan I propose:
While it’s currently possible to set configurations via environment variables or spark.hadoop.*, I believe enabling a more explicit and programmatic approach will improve flexibility and user experience. Does this approach sound reasonable to everyone? If so, I’ll start with step 1 and submit a PR to arrow-rs once it’s ready. |
I did not realize that |
I have a couple of use cases in mind that I'm hoping this will cover -
I'm assuming that as long as the we are using libhdfs, these cases are covered? |
Thanks @drexler-sky I think this approach makes a lot of sense. You probably also want to cover the cases if multiple config sources are set, like env var, spark hadoop and programmatical, which one should be overriding others. @parthchandra I'm not sure if with with
without
|
Hope so, my understanding is |
@comphead were you able to run this on a system with no hadoop/hdfs installed? I have an old version of hadoop on my mac and |
Update on this:
With the above addressed and with appropriate modifications to Comet to always use the hdfs object store, I was able to access an I'll verify with a custom credentials provider as well. So it appears that using the |
There are some problems with the approach of using fs-hdfs (libhdfs). Problems1. Linking against libjvm.soAs we discovered before, using fs-hdfs makes Here are the problems with linking against libjvm.so
Linux:
macOS:
We have to point 2. Problem with
|
I did not run into this issue when trying to use fs-hdfs with S3. I did have to add S3 to fs-hdfs (I'll submit a PR for this) and a hit a few minor issues but nothing major. I do recall we had the requirement to have libjvm in the library load path early on, but don't recall what changed to remove the requirement. |
With @drexler-sky and @Kontinuation's changes we might be in a better position to evaluate this approach in a production environment. |
+1 |
I found another problem when setting up GitHub Workflow for fs-hdfs:
The UPDATE: The CI failure is not caused by incomplete reads, but caused by hardcoding the value of O_APPEND on a specific platform instead of defining it in a portable way. But my point still holds. |
Yes, I saw this too. I will have a PR ready today. |
Should we set this to draft as we discuss the design going forward? |
Which issue does this PR close?
Closes #.
Rationale for this change
This PR adds support for Approach 2 (JNI-based Hadoop FileSystem access) to enable S3 reads via the native DataFusion Parquet scanner. The original discussion of both approaches can be found in issue #1766.
What changes are included in this PR?
Added JNI-based integration for accessing Hadoop FileSystem in Comet
Introduced a new config flag:
spark.comet.use_jni_object_store
to toggle this featureHow are these changes tested?
new test