Skip to content

8329829: HttpClient: Add a BodyPublishers.ofFileChannel method #26155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions src/java.net.http/share/classes/java/net/http/HttpRequest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -29,10 +29,9 @@
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator;
Expand Down Expand Up @@ -720,6 +719,34 @@ public static BodyPublisher ofFile(Path path) throws FileNotFoundException {
return RequestPublishers.FilePublisher.create(path);
}

/**
* {@return a request body publisher whose body is the {@code length}
* content bytes read from the provided file {@code channel} starting
* from the specified {@code offset}}
* <p>
* The {@linkplain FileChannel file channel} will be read using
* {@link FileChannel#read(ByteBuffer, long) FileChannel.read(ByteBuffer buffer, long position)},
* which does not modify the channel's position. Thus, the same file
* channel may be shared between several publishers passed to
* concurrent requests.
* <p>
* The file channel will not be closed upon completion. The caller is
* expected to manage the life cycle of the channel, and close it
* appropriately when not needed anymore.
*
* @param channel a file channel
* @param offset the offset of the first byte
* @param length the number of bytes to read from the file channel
*
* @throws IndexOutOfBoundsException if the specified byte range is
* found to be {@linkplain Objects.checkFromIndexSize(long, long, long) out of bounds}
* compared with the size of the file referred by the channel
*/
public static BodyPublisher ofFileChannel(FileChannel channel, long offset, long length) {
Objects.requireNonNull(channel, "channel");
return new RequestPublishers.FileChannelPublisher(channel, offset, length);
}

/**
* A request body publisher that takes data from an {@code Iterable}
* of byte arrays. An {@link Iterable} is provided which supplies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.net.http.HttpRequest.BodyPublisher;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
Expand Down Expand Up @@ -418,6 +419,90 @@ public long contentLength() {
}
}

public static final class FileChannelPublisher implements BodyPublisher {

private final FileChannel channel;

private final long position;

private final long limit;

public FileChannelPublisher(FileChannel channel, long offset, long length) {
this.channel = Objects.requireNonNull(channel, "channel");
long fileSize = fileSize(channel);
Objects.checkFromIndexSize(offset, length, fileSize);
this.position = offset;
this.limit = offset + length;
}

private static long fileSize(FileChannel channel) {
try {
return channel.size();
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}

@Override
public long contentLength() {
return limit - position;
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
Iterable<ByteBuffer> iterable = () -> new FileChannelIterator(channel, position, limit);
new PullPublisher<>(iterable).subscribe(subscriber);
}

}

private static final class FileChannelIterator implements Iterator<ByteBuffer> {

private final FileChannel channel;

private final long limit;

private long position;

private boolean terminated;

private FileChannelIterator(FileChannel channel, long position, long limit) {
this.channel = channel;
this.position = position;
this.limit = limit;
}

@Override
public synchronized boolean hasNext() {
return position < limit && !terminated;
}

@Override
public synchronized ByteBuffer next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
long remaining = limit - position;
ByteBuffer buffer = Utils.getBufferWithAtMost(remaining);
try {
int readLength = channel.read(buffer, position);
// Short-circuit if `read()` has failed, e.g., due to file content being changed in the meantime
if (readLength < 0) {
// We *must* throw to signal that the request needs to be cancelled.
// Otherwise, the server will continue waiting data.
throw new IOException("Unexpected EOF (position=%s)".formatted(position));
} else {
position += readLength;
}
} catch (IOException ioe) {
terminated = true;
throw new UncheckedIOException(ioe);
}
return buffer.flip();
}

}

public static final class PublisherAdapter implements BodyPublisher {

private final Publisher<? extends ByteBuffer> publisher;
Expand All @@ -430,12 +515,12 @@ public PublisherAdapter(Publisher<? extends ByteBuffer> publisher,
}

@Override
public final long contentLength() {
public long contentLength() {
return contentLength;
}

@Override
public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(subscriber);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,32 @@ public static String describeOps(int interestOps) {
public static IllegalArgumentException newIAE(String message, Object... args) {
return new IllegalArgumentException(format(message, args));
}

/**
* {@return a new {@link ByteBuffer} instance of configured capacity for the HTTP Client}
*/
public static ByteBuffer getBuffer() {
return ByteBuffer.allocate(BUFSIZE);
}

/**
* {@return a new {@link ByteBuffer} instance whose capacity is set to the
* smaller of the specified {@code maxCapacity} and the default
* ({@value BUFSIZE})}
*
* @param maxCapacity a buffer capacity, in bytes
* @throws IllegalArgumentException if {@code capacity < 0}
*/
public static ByteBuffer getBufferWithAtMost(long maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException(
// Match the message produced by `ByteBuffer::createCapacityException`
"capacity < 0: (%s < 0)".formatted(maxCapacity));
}
int effectiveCapacity = (int) Math.min(maxCapacity, BUFSIZE);
return ByteBuffer.allocate(effectiveCapacity);
}

public static Throwable getCompletionCause(Throwable x) {
Throwable cause = x;
while ((cause instanceof CompletionException)
Expand Down
Loading