Skip to content

Commit ee7bf89

Browse files
committed
fix rebase from kafka trunk
1 parent 2a40a55 commit ee7bf89

File tree

3 files changed

+79
-55
lines changed

3 files changed

+79
-55
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.record;
18+
19+
import org.junit.jupiter.api.Test;
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
25+
import static org.apache.kafka.test.TestUtils.tempFile;
26+
27+
public class FileRecordsPMemTest extends FileRecordsTest {
28+
29+
@Override
30+
protected FileRecords createFileRecords(byte[][] values) throws IOException {
31+
String pmemDir = "/tmp/pmem";
32+
File directory = new File(pmemDir);
33+
if (directory.exists()) {
34+
String[] entries = directory.list();
35+
for (String s : entries) {
36+
File currentFile = new File(directory.getPath(), s);
37+
currentFile.delete();
38+
}
39+
directory.delete();
40+
}
41+
directory.mkdirs();
42+
43+
String path = pmemDir + "/heap";
44+
long size = 1024L * 1024 * 1024 * 10;
45+
int initSize = 10 * 1024 * 1024;
46+
PMemChannel.initHeap(path, size, initSize, 0.9);
47+
48+
FileRecords fileRecords = FileRecords.open(tempFile(), true, false, initSize, true, FileRecords.FileChannelType.PMEM);
49+
return fileRecords;
50+
}
51+
52+
/**
53+
* Test that the cached size variable matches the actual file size as we append messages
54+
*/
55+
@Test
56+
@Override
57+
public void testFileSize() throws IOException {
58+
assertEquals(fileRecords.channel().position(), fileRecords.sizeInBytes());
59+
for (int i = 0; i < 20; i++) {
60+
fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("abcd".getBytes())));
61+
assertEquals(fileRecords.channel().position(), fileRecords.sizeInBytes());
62+
}
63+
}
64+
65+
/**
66+
* disable this test as it will fail when preallocate == True
67+
*/
68+
@Test
69+
@Override
70+
public void testBytesLengthOfWriteTo() throws IOException {
71+
}
72+
}

clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java

Lines changed: 6 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.junit.jupiter.api.BeforeEach;
2929
import org.junit.jupiter.api.Test;
3030
import org.mockito.Mockito;
31-
import org.junit.runner.RunWith;
32-
import org.junit.runners.Parameterized;
3331

3432
import java.io.File;
3533
import java.io.IOException;
@@ -41,8 +39,6 @@
4139
import java.util.List;
4240
import java.util.Optional;
4341
import java.util.Random;
44-
import java.util.Collection;
45-
import java.util.Arrays;
4642
import java.util.concurrent.ExecutorService;
4743
import java.util.concurrent.Executors;
4844
import java.util.concurrent.Future;
@@ -67,50 +63,20 @@
6763
import static org.mockito.Mockito.verify;
6864
import static org.mockito.Mockito.when;
6965

70-
@RunWith(Parameterized.class)
7166
public class FileRecordsTest {
7267

7368
private byte[][] values = new byte[][] {
7469
"abcd".getBytes(),
7570
"efgh".getBytes(),
7671
"ijkl".getBytes()
7772
};
78-
private FileRecords fileRecords;
73+
protected FileRecords fileRecords;
7974
private Time time;
80-
private int initSize;
81-
private boolean usePMem;
82-
83-
@Parameterized.Parameters
84-
public static Collection<Boolean> testCases() {
85-
return Arrays.asList(true, false);
86-
}
87-
88-
public FileRecordsTest(boolean usePMem) {
89-
this.usePMem = usePMem;
90-
}
9175

9276
@BeforeEach
9377
public void setup() throws IOException {
94-
if (usePMem) {
95-
String pmemDir = "/tmp/pmem";
96-
File directory = new File(pmemDir);
97-
if (directory.exists()) {
98-
String[] entries = directory.list();
99-
for (String s : entries) {
100-
File currentFile = new File(directory.getPath(), s);
101-
currentFile.delete();
102-
}
103-
directory.delete();
104-
}
105-
directory.mkdirs();
106-
107-
String path = pmemDir + "/heap";
108-
long size = 1024L * 1024 * 1024 * 10;
109-
this.initSize = 10 * 1024 * 1024;
110-
PMemChannel.initHeap(path, size, initSize, 0.9);
111-
}
112-
11378
this.fileRecords = createFileRecords(values);
79+
append(fileRecords, values);
11480
this.time = new MockTime();
11581
}
11682

@@ -149,18 +115,10 @@ public void testOutOfRangeSlice() {
149115
*/
150116
@Test
151117
public void testFileSize() throws IOException {
152-
if (usePMem) {
153-
assertEquals(fileRecords.channel().position(), fileRecords.sizeInBytes());
154-
} else {
155-
assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
156-
}
118+
assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
157119
for (int i = 0; i < 20; i++) {
158120
fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("abcd".getBytes())));
159-
if (usePMem) {
160-
assertEquals(fileRecords.channel().position(), fileRecords.sizeInBytes());
161-
} else {
162-
assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
163-
}
121+
assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
164122
}
165123
}
166124

@@ -732,14 +690,8 @@ private static List<RecordBatch> batches(Records buffer) {
732690
return TestUtils.toList(buffer.batches());
733691
}
734692

735-
private FileRecords createFileRecords(byte[][] values) throws IOException {
736-
FileRecords fileRecords = null;
737-
if (usePMem) {
738-
fileRecords = FileRecords.open(tempFile(), true, false, initSize, true, FileRecords.FileChannelType.PMEM);
739-
} else {
740-
fileRecords = FileRecords.open(tempFile());
741-
}
742-
append(fileRecords, values);
693+
protected FileRecords createFileRecords(byte[][] values) throws IOException {
694+
FileRecords fileRecords = FileRecords.open(tempFile());
743695
return fileRecords;
744696
}
745697

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,7 @@ object KafkaConfig {
10441044
val PMemSizeDoc = s"PMem capacity. Only used if $LogChannelTypeProp == pmem"
10451045
val LogChannelTypeDoc = "Log channel type (e.g., file, pmem)"
10461046

1047-
private val configDef = {
1047+
private[server] val configDef = {
10481048
import ConfigDef.Importance._
10491049
import ConfigDef.Range._
10501050
import ConfigDef.Type._

0 commit comments

Comments
 (0)