|
33 | 33 | import java.io.File; |
34 | 34 | import java.io.FileInputStream; |
35 | 35 | import java.io.IOException; |
| 36 | +import java.lang.reflect.Field; |
36 | 37 | import java.nio.ByteBuffer; |
37 | 38 | import java.util.List; |
| 39 | +import java.util.Set; |
| 40 | + |
38 | 41 | import org.apache.bookkeeper.bookie.Bookie; |
39 | 42 | import org.apache.bookkeeper.bookie.Bookie.NoEntryException; |
40 | 43 | import org.apache.bookkeeper.bookie.BookieException; |
|
52 | 55 | import org.apache.bookkeeper.conf.TestBKConfiguration; |
53 | 56 | import org.apache.bookkeeper.proto.BookieProtocol; |
54 | 57 | import org.junit.After; |
| 58 | +import org.junit.Assert; |
55 | 59 | import org.junit.Before; |
56 | 60 | import org.junit.Test; |
57 | 61 | import org.slf4j.Logger; |
@@ -822,4 +826,60 @@ public void testSingleLedgerDirectoryCheckpoint() throws Exception { |
822 | 826 | assertEquals(7, logMark.getLogFileId()); |
823 | 827 | assertEquals(8, logMark.getLogFileOffset()); |
824 | 828 | } |
| 829 | + |
| 830 | + @Test |
| 831 | + public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedgers() |
| 832 | + throws Exception { |
| 833 | + int gcWaitTime = 1000; |
| 834 | + File ledgerDir = new File(tmpDir, "dir"); |
| 835 | + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| 836 | + conf.setGcWaitTime(gcWaitTime); |
| 837 | + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4); |
| 838 | + conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4); |
| 839 | + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); |
| 840 | + conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() }); |
| 841 | + |
| 842 | + BookieImpl bookie = new TestBookieImpl(conf); |
| 843 | + ByteBuf entry1 = Unpooled.buffer(1024); |
| 844 | + entry1.writeLong(1); // ledger id |
| 845 | + entry1.writeLong(2); // entry id |
| 846 | + entry1.writeBytes("entry-1".getBytes()); |
| 847 | + bookie.getLedgerStorage().addEntry(entry1); |
| 848 | + |
| 849 | + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2); |
| 850 | + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush(); |
| 851 | + |
| 852 | + File ledgerDirMark = new File(ledgerDir + "/current", "lastMark"); |
| 853 | + try { |
| 854 | + LogMark logMark = readLogMark(ledgerDirMark); |
| 855 | + assertEquals(1, logMark.getLogFileId()); |
| 856 | + assertEquals(2, logMark.getLogFileOffset()); |
| 857 | + } catch (Exception e) { |
| 858 | + fail(); |
| 859 | + } |
| 860 | + |
| 861 | + ByteBuf entry2 = Unpooled.buffer(1024); |
| 862 | + entry2.writeLong(2); // ledger id |
| 863 | + entry2.writeLong(1); // entry id |
| 864 | + entry2.writeBytes("entry-2".getBytes()); |
| 865 | + |
| 866 | + bookie.getLedgerStorage().addEntry(entry2); |
| 867 | + // write one entry to first ledger directory and flush with logMark(1, 2), |
| 868 | + // only the first ledger directory should have lastMark |
| 869 | + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5); |
| 870 | + |
| 871 | + SingleDirectoryDbLedgerStorage storage1 = |
| 872 | + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0); |
| 873 | + Field field = SingleDirectoryDbLedgerStorage.class.getDeclaredField("ledgerIndex"); |
| 874 | + field.setAccessible(true); |
| 875 | + LedgerMetadataIndex ledgerMetadataIndex = (LedgerMetadataIndex) field.get(storage1); |
| 876 | + Field field1 = LedgerMetadataIndex.class.getDeclaredField("pendingDeletedLedgers"); |
| 877 | + field1.setAccessible(true); |
| 878 | + Set<Long> pendingDeletedLedgers = (Set<Long>) field1.get(ledgerMetadataIndex); |
| 879 | + |
| 880 | + Assert.assertEquals(pendingDeletedLedgers.size(), 0); |
| 881 | + pendingDeletedLedgers.add(2L); |
| 882 | + bookie.getLedgerStorage().flush(); |
| 883 | + Assert.assertEquals(pendingDeletedLedgers.size(), 0); |
| 884 | + } |
825 | 885 | } |
0 commit comments