Skip to content

Commit 090ed7f

Browse files
authored
Merge pull request #473 from jfzunigac/multidirectory_support
Allow multiple directories per log config
2 parents 35fc7ec + aba727e commit 090ed7f

File tree

14 files changed

+843
-135
lines changed

14 files changed

+843
-135
lines changed

singer/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@
9595
<artifactId>commons-text</artifactId>
9696
<version>1.10.0</version>
9797
</dependency>
98+
<dependency>
99+
<groupId>org.apache.commons</groupId>
100+
<artifactId>commons-lang3</artifactId>
101+
<version>3.12.0</version>
102+
</dependency>
98103
<dependency>
99104
<groupId>io.netty</groupId>
100105
<artifactId>netty-all</artifactId>

singer/src/main/java/com/pinterest/singer/common/LogStream.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.io.File;
4141
import java.io.FileFilter;
4242
import java.io.IOException;
43+
import java.nio.file.Path;
4344
import java.util.ArrayList;
4445
import java.util.Arrays;
4546
import java.util.HashMap;
@@ -94,7 +95,7 @@ public class LogStream {
9495

9596
// The descriptor of the LogStream which consists of SingleLog name and LogStream name.
9697
// It is globally unique descriptor of the LogStream.
97-
private final String logStreamDescriptor;
98+
private String logStreamDescriptor;
9899

99100
private Object logFilesInfoLock = new Object();
100101

@@ -117,8 +118,25 @@ public class LogStream {
117118

118119
private long lastCompleteCycleTime;
119120

121+
// the directory that this logStream pertains to
122+
private String dir;
123+
120124
private FileNameMatchMode fileNameMatchMode;
121125

126+
/**
127+
* New constructor if there are multiple directories in a config because we need to know the directory through the
128+
* full added path to derive the directory, we can no longer use the log config to derive the directory
129+
*
130+
* Example full added path: "/var/log/singer/singer.log.0"
131+
* With the Multi-directory feature, the directory can be "/var/log/*" and the above full added path can be used.
132+
* */
133+
public LogStream(SingerLog singerLog, Path fullAddedPath) {
134+
this(singerLog, fullAddedPath.toFile().getName());
135+
this.dir = SingerUtils.extractDirectoryPath(fullAddedPath.toString());
136+
this.logStreamDescriptor = String.format("%s:%s:%s", singerLog.getLogName(), fileNamePrefix, this.dir);
137+
this.fullPathPrefix = FilenameUtils.concat(dir, fileNamePrefix);
138+
}
139+
122140
public LogStream(SingerLog singerLog, String fileNamePrefix) {
123141
Preconditions.checkArgument(!Strings.isNullOrEmpty(fileNamePrefix));
124142

@@ -129,7 +147,8 @@ public LogStream(SingerLog singerLog, String fileNamePrefix) {
129147
this.logFilePathsIndex = new HashMap<>();
130148
this.lastStreamModificationTime = -1L;
131149

132-
String dir = this.singerLog.getSingerLogConfig().getLogDir();
150+
this.dir = this.singerLog.getSingerLogConfig().getLogDir();
151+
this.logStreamDescriptor = String.format("%s:%s:%s", singerLog.getLogName(), fileNamePrefix, this.dir);
133152
this.fileNameMatchMode = singerLog.getSingerLogConfig().getFilenameMatchMode();
134153
this.fileNamePrefix = fileNamePrefix;
135154
this.fullPathPrefix = FilenameUtils.concat(dir, fileNamePrefix);
@@ -147,7 +166,7 @@ public int size() {
147166
public void initialize() throws IOException {
148167
SingerLogConfig singerLogConfig = singerLog.getSingerLogConfig();
149168
String regexStr = fileNamePrefix;
150-
File logDir = new File(singerLogConfig.getLogDir());
169+
File logDir = new File(dir);
151170

152171
if (singerLogConfig.getFilenameMatchMode() == FileNameMatchMode.PREFIX) {
153172
regexStr += ".*";
@@ -283,7 +302,7 @@ public void setLatestProcessedMessageTime(long timestamp) {
283302
}
284303

285304
public String getLogDir() {
286-
return this.singerLog.getSingerLogConfig().getLogDir();
305+
return this.dir;
287306
}
288307

289308
/**

singer/src/main/java/com/pinterest/singer/common/SingerSettings.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
import com.pinterest.singer.monitor.LogStreamManager;
2828
import com.pinterest.singer.thrift.configuration.SingerConfig;
2929
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
30+
import com.pinterest.singer.utils.SingerUtils;
3031
import com.pinterest.singer.writer.KafkaProducerMetricsMonitor;
3132
import com.twitter.ostrich.stats.Stats;
3233
import com.google.common.annotations.VisibleForTesting;
3334
import com.google.common.util.concurrent.ThreadFactoryBuilder;
35+
import org.apache.commons.lang3.tuple.Pair;
3436
import org.slf4j.Logger;
3537
import org.slf4j.LoggerFactory;
3638

@@ -41,6 +43,7 @@
4143
import java.util.ArrayList;
4244
import java.util.Collection;
4345
import java.util.HashMap;
46+
import java.util.List;
4447
import java.util.Map;
4548
import java.util.Map.Entry;
4649
import java.util.SortedMap;
@@ -92,7 +95,7 @@ public final class SingerSettings {
9295
public static HeartbeatGenerator heartbeatGenerator;
9396

9497
// initialized here so that unit tests aren't required to call the initialize method below
95-
private static SortedMap<String, Collection<SingerLogConfig>> logConfigMap = new TreeMap<>();
98+
private static SortedMap<Pair<Integer, String>, Collection<SingerLogConfig>> logConfigMap = new TreeMap<>();
9699

97100
//environment is production by default
98101
private static Environment environment = new Environment();
@@ -249,17 +252,18 @@ public static Method getLogMonitorStaticInstanceMethod(String monitorClassName)
249252
return getInstance;
250253
}
251254

252-
public static SortedMap<String, Collection<SingerLogConfig>> loadLogConfigMap(SingerConfig config) {
253-
SortedMap<String, Collection<SingerLogConfig>> logConfigMap = new TreeMap<>();
254-
if(config.getLogConfigs()!=null) {
255-
for (SingerLogConfig singerLogConfig : config.getLogConfigs()) {
256-
Collection<SingerLogConfig> collection = logConfigMap.get(singerLogConfig.getLogDir());
257-
if (collection == null) {
258-
collection = new ArrayList<>();
259-
logConfigMap.put(singerLogConfig.getLogDir(), collection);
260-
}
261-
collection.add(singerLogConfig);
262-
}
255+
public static SortedMap<Pair<Integer, String>, Collection<SingerLogConfig>> loadLogConfigMap(SingerConfig config) {
256+
SortedMap<Pair<Integer, String>, Collection<SingerLogConfig>> logConfigMap = new TreeMap<>();
257+
List<SingerLogConfig> logConfigs = config.getLogConfigs();
258+
if (logConfigs == null) {
259+
return logConfigMap;
260+
}
261+
for (SingerLogConfig singerLogConfig : logConfigs) {
262+
List<String> directories = SingerUtils.splitString(singerLogConfig.getLogDir());
263+
for (String logDir : directories) {
264+
logConfigMap.computeIfAbsent(Pair.of(logDir.split("/").length - 1, logDir),
265+
k -> new ArrayList<>()).add(singerLogConfig);
266+
}
263267
}
264268
return logConfigMap;
265269
}
@@ -334,7 +338,7 @@ public static void reset() {
334338
fsMonitorMap.clear();
335339
}
336340

337-
public static SortedMap<String, Collection<SingerLogConfig>> getLogConfigMap() {
341+
public static SortedMap<Pair<Integer, String>, Collection<SingerLogConfig>> getLogConfigMap() {
338342
return logConfigMap;
339343
}
340344

0 commit comments

Comments
 (0)