Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 91 additions & 38 deletions src/main/java/com/aws/greengrass/logmanager/LogManagerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.aws.greengrass.logmanager.model.LogFileInformation;
import com.aws.greengrass.logmanager.model.ProcessingFiles;
import com.aws.greengrass.logmanager.services.DiskSpaceManagementService;
import com.aws.greengrass.logmanager.util.PeriodicBuffer;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.NucleusPaths;
import com.aws.greengrass.util.Utils;
Expand Down Expand Up @@ -104,8 +105,9 @@ public class LogManagerService extends PluginService {
public static final String DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME = "deleteLogFileAfterCloudUpload";
public static final String UPLOAD_TO_CW_CONFIG_TOPIC_NAME = "uploadToCloudWatch";
public static final String MULTILINE_PATTERN_CONFIG_TOPIC_NAME = "multiLineStartPattern";
public static final String BUFFER_INTERVAL_SEC = "bufferIntervalSec";
public static final double DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC = 300;
public static final int MAX_CACHE_INACTIVE_TIME_SECONDS = 60 * 60 * 24; // 1 day
public static final int MAX_CACHE_INACTIVE_TIME_SECONDS = (int) TimeUnit.DAYS.toSeconds(1);

private final List<Consumer<EventType>> serviceStatusListeners = new ArrayList<>();

Expand All @@ -117,6 +119,9 @@ public class LogManagerService extends PluginService {
new ConcurrentHashMap<>();
private final DiskSpaceManagementService diskSpaceManagementService;

private PeriodicBuffer<String, ProcessingFiles> processingFilesBuffer;
private PeriodicBuffer<String, Instant> lastUploadedFileBuffer;


@Getter
Map<String, ComponentLogConfiguration> componentLogConfigurations = new ConcurrentHashMap<>();
Expand All @@ -143,7 +148,6 @@ public class LogManagerService extends PluginService {
this.logsProcessor = logProcessor;
this.diskSpaceManagementService = new DiskSpaceManagementService();
this.workDir = nucleusPaths.workPath(LOGS_UPLOADER_SERVICE_TOPICS);

topics.lookupTopics(CONFIGURATION_CONFIG_KEY).subscribe((why, newv) -> {
if (why == WhatHappened.timestampUpdated) {
return;
Expand All @@ -152,8 +156,36 @@ public class LogManagerService extends PluginService {
handlePeriodicUploadIntervalSecConfig(topics);
handleLogsUploaderConfig(topics);
});

this.uploader.registerAttemptStatus(LOGS_UPLOADER_SERVICE_TOPICS, this::handleCloudWatchAttemptStatus);
initializeConfigUpdateBuffers(topics);
}

private void initializeConfigUpdateBuffers(Topics topics) {
long bufferIntervalSeconds =
Coerce.toLong(topics.findOrDefault(
LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, CONFIGURATION_CONFIG_KEY, BUFFER_INTERVAL_SEC));

// Ensure buffer interval is at least as large as periodicUpdateIntervalSec
bufferIntervalSeconds = Math.max(bufferIntervalSeconds, (long) periodicUpdateIntervalSec);

logger.atInfo().log("Initializing config update buffers with interval: {} seconds", bufferIntervalSeconds);

// Buffer for processing files information
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need separate buffers?

processingFilesBuffer = new PeriodicBuffer<>(
"ProcessingFilesBuffer",
bufferIntervalSeconds,
this::flushProcessingFilesUpdates
);

// Buffer for last uploaded file timestamps
lastUploadedFileBuffer = new PeriodicBuffer<>(
"LastUploadedFileBuffer",
bufferIntervalSeconds,
this::flushLastUploadedFileUpdates
);

processingFilesBuffer.start();
lastUploadedFileBuffer.start();
}

private void handlePeriodicUploadIntervalSecConfig(Topics topics) {
Expand Down Expand Up @@ -404,7 +436,6 @@ private void loadProcessingFilesConfigDeprecated(String componentName) {
Topics currentProcessingComponentTopicsDeprecated = getRuntimeConfig()
.lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, componentName);


if (isDeprecatedVersionSupported()
&& currentProcessingComponentTopicsDeprecated != null
&& !currentProcessingComponentTopicsDeprecated.isEmpty()) {
Expand Down Expand Up @@ -499,7 +530,7 @@ private void handleCloudWatchAttemptStatus(CloudWatchAttempt cloudWatchAttempt)

// Record the last processed file timestamp
completedFiles.forEach(file -> {
updatelastComponentUploadedLogFile(lastComponentUploadedLogFileInstantMap, componentName, file);
updateLastComponentUploadedLogFile(lastComponentUploadedLogFileInstantMap, componentName, file);
});

if (!componentLogConfigurations.containsKey(componentName)) {
Expand All @@ -519,41 +550,14 @@ private void handleCloudWatchAttemptStatus(CloudWatchAttempt cloudWatchAttempt)
completedFiles.forEach(file -> this.deleteFile(componentLogConfiguration, file));
});

// Update the runtime configuration and store the last processed file information
context.runOnPublishQueueAndWait(() -> {
processingFilesInformation.forEach((componentName, processingFiles) -> {
if (isDeprecatedVersionSupported()) {
// Update old config value to handle downgrade from v 2.3.1 to older ones
Topics componentTopicsDeprecated =
getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION,
componentName);

if (processingFiles.getMostRecentlyUsed() != null) {
updateFromMapWhenChanged(componentTopicsDeprecated,
processingFiles.getMostRecentlyUsed().convertToMapOfObjects(),
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE,
System.currentTimeMillis()));
}
}

// Handle version 2.3.1 and above

Topics componentTopics =
getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2,
componentName);

updateFromMapWhenChanged(componentTopics, processingFiles.toMap(),
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis()));
});
processingFilesInformation.forEach((componentName, processingFiles) -> {
processingFilesBuffer.put(componentName, processingFiles);
});
context.waitForPublishQueueToClear();

lastComponentUploadedLogFileInstantMap.forEach((componentName, instant) -> {
Topics componentTopics = getRuntimeConfig()
.lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName);
Topic lastFileProcessedTimeStamp = componentTopics.createLeafChild(PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP);
lastFileProcessedTimeStamp.withValue(instant.toEpochMilli());
lastUploadedFileBuffer.put(componentName, instant);
});

isCurrentlyUploading.set(false);
}

Expand Down Expand Up @@ -657,7 +661,7 @@ private void processCloudWatchAttemptLogInformation(Map<String, Set<LogFile>> co
* @param componentName componentName.
* @param logFile the logFile that is going to be recorded.
*/
private void updatelastComponentUploadedLogFile(Map<String, Instant> lastComponentUploadedLogFileInstantMap,
private void updateLastComponentUploadedLogFile(Map<String, Instant> lastComponentUploadedLogFileInstantMap,
String componentName,
LogFile logFile) {
if (!lastComponentUploadedLogFileInstantMap.containsKey(componentName)
Expand Down Expand Up @@ -700,6 +704,7 @@ private void processLogsAndUpload() throws InterruptedException {
Instant.EPOCH);

try {
// list of component files that needs to be uploaded to cloud
LogFileGroup logFileGroup = LogFileGroup.create(componentLogConfiguration,
lastUploadedLogFileTimeMs, workDir);
if (Thread.currentThread().isInterrupted()) {
Expand Down Expand Up @@ -744,7 +749,7 @@ private void processLogsAndUpload() throws InterruptedException {
if (startPosition < file.length()) {
logFileInfo.getLogFileInformationList().add(logFileInformation);
} else if (startPosition == file.length() && !logFileGroup.isActiveFile(file)) {
updatelastComponentUploadedLogFile(lastComponentUploadedLogFileInstantMap,
updateLastComponentUploadedLogFile(lastComponentUploadedLogFileInstantMap,
componentName, file);

// NOTE: This handles the scenario where we are uploading the active file constantly and
Expand Down Expand Up @@ -782,6 +787,46 @@ private void processLogsAndUpload() throws InterruptedException {
}
}

// Handler for flushing processing files updates
private void flushProcessingFilesUpdates(Map<String, ProcessingFiles> updates) {
context.runOnPublishQueueAndWait(() -> {
updates.forEach((componentName, processingFiles) -> {
if (isDeprecatedVersionSupported()) {
// Update old config value to handle downgrade from v 2.3.1 to older ones
Topics componentTopicsDeprecated =
getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION,
componentName);

if (processingFiles.getMostRecentlyUsed() != null) {
updateFromMapWhenChanged(componentTopicsDeprecated,
processingFiles.getMostRecentlyUsed().convertToMapOfObjects(),
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE,
System.currentTimeMillis()));
}
}

// Handle version 2.3.1 and above
Topics componentTopics =
getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2,
componentName);

updateFromMapWhenChanged(componentTopics, processingFiles.toMap(),
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis()));
});
});
context.waitForPublishQueueToClear();
}

// Handler for flushing last uploaded file updates
private void flushLastUploadedFileUpdates(Map<String, Instant> updates) {
updates.forEach((componentName, instant) -> {
Topics componentTopics = getRuntimeConfig()
.lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName);
Topic lastFileProcessedTimeStamp = componentTopics.createLeafChild(PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP);
lastFileProcessedTimeStamp.withValue(instant.toEpochMilli());
});
}

private void sleepFractionalSeconds(double timeToSleep) throws InterruptedException {
TimeUnit.MICROSECONDS.sleep(Math.round(timeToSleep * TimeUnit.SECONDS.toMicros(1)));
}
Expand All @@ -805,6 +850,14 @@ public void startup() throws InterruptedException {
public void shutdown() throws InterruptedException {
super.shutdown();
isCurrentlyUploading.set(false);

// Shutdown the buffers (this will flush any pending updates)
if (processingFilesBuffer != null) {
processingFilesBuffer.shutdown();
}
if (lastUploadedFileBuffer != null) {
lastUploadedFileBuffer.shutdown();
}
}

@Builder
Expand Down
Loading
Loading