Skip to content

Commit 7cc0a31

Browse files
committed
调整
1 parent 5e9ef28 commit 7cc0a31

File tree

12 files changed

+298
-183
lines changed

12 files changed

+298
-183
lines changed

pom.xml

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,6 @@
9797
<scope>test</scope>
9898
</dependency>
9999

100-
<dependency>
101-
<groupId>org.testng</groupId>
102-
<artifactId>testng</artifactId>
103-
<version>${testng.version}</version>
104-
<scope>test</scope>
105-
</dependency>
106-
107100
<dependency>
108101
<groupId>org.projectlombok</groupId>
109102
<artifactId>lombok</artifactId>
@@ -130,14 +123,15 @@
130123
<artifactId>maven-surefire-plugin</artifactId>
131124
<version>${maven-surefire-plugin.version}</version>
132125
<configuration>
133-
<parallel>methods</parallel>
134-
<threadCount>10</threadCount>
135-
<suiteXmlFiles>
136-
<file>testng.xml</file>
137-
</suiteXmlFiles>
126+
<forkCount>3</forkCount>
127+
<reuseForks>true</reuseForks>
128+
<excludes>
129+
<exclude>**/*ManualTest.java</exclude>
130+
<exclude>**/*IntegrationTest.java</exclude>
131+
</excludes>
138132
</configuration>
139133
</plugin>
140-
134+
141135
<plugin>
142136
<groupId>org.apache.maven.plugins</groupId>
143137
<artifactId>maven-compiler-plugin</artifactId>
@@ -150,9 +144,9 @@
150144
<version>${nexus-staging-maven-plugin.version}</version>
151145
<extensions>true</extensions>
152146
<configuration>
153-
<serverId>ossrh</serverId>
154-
<nexusUrl>https://s01.oss.sonatype.org/</nexusUrl>
155-
<autoReleaseAfterClose>true</autoReleaseAfterClose>
147+
<serverId>ossrh</serverId>
148+
<nexusUrl>https://s01.oss.sonatype.org/</nexusUrl>
149+
<autoReleaseAfterClose>true</autoReleaseAfterClose>
156150
</configuration>
157151
</plugin>
158152
</plugins>
@@ -172,7 +166,16 @@
172166
<url>https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/</url>
173167
</repository>
174168
</distributionManagement>
175-
169+
170+
<dependencies>
171+
<dependency>
172+
<groupId>org.testng</groupId>
173+
<artifactId>testng</artifactId>
174+
<version>${testng.version}</version>
175+
<scope>test</scope>
176+
</dependency>
177+
</dependencies>
178+
176179
<build>
177180
<plugins>
178181
<plugin>

src/main/java/io/github/kavahub/file/reader/FileByteReaderQuery.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,48 @@ public CompletableFuture<Void> subscribe(BiConsumer<? super byte[], ? super Thro
3333
AsynchronousFileChannel channel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
3434
ReadFile reader = ReadFile.of(channel, bufferSize)
3535
// 读取文件异常时
36-
.whenError(throwable -> consumer.accept(null, throwable))
36+
.whenError(throwable -> {
37+
if (log.isErrorEnabled()) {
38+
log.error("Failed while file reading", throwable);
39+
}
40+
41+
consumer.accept(null, throwable);
42+
})
3743
// 读取文件完成时
38-
.whenComplete(size -> {
44+
.whenFinish(size -> {
45+
if (log.isDebugEnabled()) {
46+
log.debug("File read complete", size);
47+
}
48+
3949
if (!future.isDone()) {
4050
future.complete(null);
4151
}
4252
})
4353
// 读取到文件数据时
44-
.whenRead(bytes -> consumer.accept(bytes, null));
54+
.whenCancel(size -> {
55+
if (log.isDebugEnabled()) {
56+
log.debug("Cancel file reading");
57+
}
58+
})
59+
// 读取到文件数据时
60+
.whenRead((bytes, size) -> {
61+
if (log.isDebugEnabled()) {
62+
log.debug("[{} bytes] has been readed", size);
63+
}
64+
65+
consumer.accept(bytes, null);
66+
});
67+
4568

4669
// 开始读
4770
reader.read();
4871
future.whenComplete((data, err) -> {
4972
if (data == null || err != null) {
5073
// 取消读操作
5174
reader.cancel();
75+
if (log.isDebugEnabled()) {
76+
log.debug("The signal to cancel file reading has been sent");
77+
}
5278

5379
// 关闭channel
5480
ChannelHelper.close(channel);

src/main/java/io/github/kavahub/file/reader/ReadFile.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
import java.nio.channels.AsynchronousFileChannel;
55
import java.nio.channels.CompletionHandler;
66
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.function.BiConsumer;
78
import java.util.function.Consumer;
89

9-
import lombok.extern.slf4j.Slf4j;
1010

1111
/**
1212
*
1313
*/
14-
@Slf4j
1514
public final class ReadFile {
1615
/**
1716
*
@@ -21,12 +20,17 @@ public final class ReadFile {
2120
/**
2221
*
2322
*/
24-
private Consumer<byte[]> whenRead;
23+
private BiConsumer<byte[], Integer> whenRead;
2524

2625
/**
2726
*
2827
*/
29-
private Consumer<Integer> whenComplete;
28+
private Consumer<Integer> whenFinish;
29+
30+
/**
31+
*
32+
*/
33+
private Consumer<Integer> whenCancel;
3034

3135
private final AsynchronousFileChannel channel;
3236
private final int bufferSize;
@@ -46,23 +50,25 @@ public ReadFile whenError(Consumer<Throwable> whenError) {
4650
return this;
4751
}
4852

49-
public ReadFile whenRead(Consumer<byte[]> whenRead) {
53+
public ReadFile whenRead(BiConsumer<byte[], Integer> whenRead) {
5054
this.whenRead = whenRead;
5155
return this;
5256
}
5357

54-
public ReadFile whenComplete(Consumer<Integer> whenComplete) {
55-
this.whenComplete = whenComplete;
58+
public ReadFile whenFinish(Consumer<Integer> whenFinish) {
59+
this.whenFinish = whenFinish;
60+
return this;
61+
}
62+
63+
public ReadFile whenCancel(Consumer<Integer> whenCancel) {
64+
this.whenCancel = whenCancel;
5665
return this;
5766
}
5867

5968
/**
6069
*
6170
*/
6271
public void cancel() {
63-
if (log.isDebugEnabled()) {
64-
log.debug("Cancel signal received");
65-
}
6672
cancelled = true;
6773
}
6874

@@ -85,29 +91,21 @@ public void completed(Integer result, ByteBuffer buffer) {
8591

8692
byte[] data = new byte[result];
8793
buffer.get(data);
88-
whenRead.accept(data);
94+
whenRead.accept(data, position.get());
8995

96+
9097
if (!isCancelled()) {
9198
read0(channel, position.get(), this);
9299
} else {
93-
if (log.isDebugEnabled()) {
94-
log.debug("Cancel file reading. [{} bytes] has been readed", position.get());
95-
}
96-
whenComplete.accept(position.get());
100+
whenCancel.accept(position.get());
97101
}
98102
} else {
99-
if (log.isDebugEnabled()) {
100-
log.debug("File read complete. [{} bytes] has been readed", position.get());
101-
}
102-
whenComplete.accept(position.get());
103+
whenFinish.accept(position.get());
103104
}
104105
}
105106

106107
@Override
107108
public void failed(Throwable exc, ByteBuffer buffer) {
108-
if (log.isErrorEnabled()) {
109-
log.error("Failed while reading", exc);
110-
}
111109
whenError.accept(exc);
112110
}
113111
};

src/main/java/io/github/kavahub/file/writer/AIOFileWriter.java

Lines changed: 1 addition & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@
55
import java.nio.file.StandardOpenOption;
66
import java.util.concurrent.CompletableFuture;
77

8-
import io.github.kavahub.file.query.Query;
98
import lombok.experimental.UtilityClass;
10-
import lombok.extern.slf4j.Slf4j;
119

1210
@UtilityClass
13-
@Slf4j
1411
public class AIOFileWriter {
1512
public CompletableFileWriter of(Path file) throws IOException {
1613
return of(file, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
@@ -27,8 +24,6 @@ public CompletableFuture<Integer> write(Path file, byte[] bytes) {
2724
public CompletableFuture<Integer> write(Path file, byte[] bytes, StandardOpenOption... options) {
2825
try (CompletableFileWriter writer = CompletableFileWriter.of(file, options)) {
2926
return writer.write(bytes);
30-
} catch (Exception e) {
31-
return CompletableFuture.failedFuture(e);
3227
}
3328
}
3429

@@ -38,68 +33,10 @@ public CompletableFuture<Integer> write(Path file, String line) {
3833

3934
public CompletableFuture<Integer> write(Path file, String line,
4035
StandardOpenOption... options) {
41-
if (log.isDebugEnabled()) {
42-
log.debug("Begin to write file: {}", file.toString());
43-
}
44-
try (CompletableFileWriter writer = CompletableFileWriter.of(file, options)) {
45-
CompletableFuture<Integer> future = writer.write(line);
46-
future.whenComplete((size, error) -> {
47-
if (log.isDebugEnabled()) {
48-
log.debug("Total [{} bytes] has been writed to file: {}", size, file.toString());
49-
}
50-
});
51-
return future;
52-
} catch (Exception e) {
53-
return CompletableFuture.failedFuture(e);
54-
}
55-
}
56-
57-
public CompletableFuture<Integer> write(Path file, Query<String> lines) {
58-
return write(file, lines, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
59-
}
6036

61-
public CompletableFuture<Integer> write(Path file, Query<String> lines, StandardOpenOption... options) {
62-
if (log.isDebugEnabled()) {
63-
log.debug("Begin to write file: {}", file.toString());
64-
}
6537
try (CompletableFileWriter writer = CompletableFileWriter.of(file, options)) {
66-
67-
lines.subscribe((data, error) -> {
68-
writer.write(data);
69-
}).join();
70-
71-
writer.getPosition().whenComplete((size, error) -> {
72-
if (log.isDebugEnabled()) {
73-
log.debug("Total [{} bytes] has been writed to file: {}", size, file.toString());
74-
}
75-
});
76-
return writer.getPosition();
77-
} catch (Exception e) {
78-
return CompletableFuture.failedFuture(e);
38+
return writer.write(line);
7939
}
8040
}
8141

82-
83-
public CompletableFuture<Integer> write(Path file, Iterable<String> lines) {
84-
return write(file, lines, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
85-
}
86-
87-
public CompletableFuture<Integer> write(Path file, Iterable<String> lines,
88-
StandardOpenOption... options) {
89-
if (log.isDebugEnabled()) {
90-
log.debug("Begin to write file: {}", file.toString());
91-
}
92-
try (CompletableFileWriter writer = CompletableFileWriter.of(file, options)) {
93-
lines.forEach(writer::write);
94-
95-
writer.getPosition().whenComplete((size, error) -> {
96-
if (log.isDebugEnabled()) {
97-
log.debug("Total [{} bytes] has been writed to file: {}", size, file.toString());
98-
}
99-
});
100-
return writer.getPosition();
101-
} catch (Exception e) {
102-
return CompletableFuture.failedFuture(e);
103-
}
104-
}
10542
}

0 commit comments

Comments
 (0)