Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 85 additions & 34 deletions src/main/java/com/aws/greengrass/logmanager/LogManagerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.aws.greengrass.logmanager.model.LogFileInformation;
import com.aws.greengrass.logmanager.model.ProcessingFiles;
import com.aws.greengrass.logmanager.services.DiskSpaceManagementService;
import com.aws.greengrass.logmanager.util.PeriodicBuffer;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.NucleusPaths;
import com.aws.greengrass.util.Utils;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class LogManagerService extends PluginService {
public static final String DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME = "deleteLogFileAfterCloudUpload";
public static final String UPLOAD_TO_CW_CONFIG_TOPIC_NAME = "uploadToCloudWatch";
public static final String MULTILINE_PATTERN_CONFIG_TOPIC_NAME = "multiLineStartPattern";
public static final String BUFFER_INTERVAL_SEC = "bufferIntervalSec";
public static final double DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC = 300;
public static final int MAX_CACHE_INACTIVE_TIME_SECONDS = 60 * 60 * 24; // 1 day

Expand All @@ -117,6 +119,9 @@ public class LogManagerService extends PluginService {
new ConcurrentHashMap<>();
private final DiskSpaceManagementService diskSpaceManagementService;

private PeriodicBuffer<String, ProcessingFiles> processingFilesBuffer;
private PeriodicBuffer<String, Instant> lastUploadedFileBuffer;


@Getter
Map<String, ComponentLogConfiguration> componentLogConfigurations = new ConcurrentHashMap<>();
Expand All @@ -143,7 +148,6 @@ public class LogManagerService extends PluginService {
this.logsProcessor = logProcessor;
this.diskSpaceManagementService = new DiskSpaceManagementService();
this.workDir = nucleusPaths.workPath(LOGS_UPLOADER_SERVICE_TOPICS);

topics.lookupTopics(CONFIGURATION_CONFIG_KEY).subscribe((why, newv) -> {
if (why == WhatHappened.timestampUpdated) {
return;
Expand All @@ -152,8 +156,34 @@ public class LogManagerService extends PluginService {
handlePeriodicUploadIntervalSecConfig(topics);
handleLogsUploaderConfig(topics);
});

this.uploader.registerAttemptStatus(LOGS_UPLOADER_SERVICE_TOPICS, this::handleCloudWatchAttemptStatus);
initializeConfigUpdateBuffers(topics);
}

private void initializeConfigUpdateBuffers(Topics topics) {
long bufferIntervalSeconds =
Coerce.toLong(topics.findOrDefault(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC,
CONFIGURATION_CONFIG_KEY, BUFFER_INTERVAL_SEC));

// Ensure buffer interval is at least as large as periodicUpdateIntervalSec
bufferIntervalSeconds = Math.max(bufferIntervalSeconds, (long) periodicUpdateIntervalSec);

// Buffer for processing files information
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need separate buffers?

processingFilesBuffer = new PeriodicBuffer<>(
"ProcessingFilesBuffer",
bufferIntervalSeconds,
this::flushProcessingFilesUpdates
);

// Buffer for last uploaded file timestamps
lastUploadedFileBuffer = new PeriodicBuffer<>(
"LastUploadedFileBuffer",
bufferIntervalSeconds,
this::flushLastUploadedFileUpdates
);

processingFilesBuffer.start();
lastUploadedFileBuffer.start();
}

private void handlePeriodicUploadIntervalSecConfig(Topics topics) {
Expand Down Expand Up @@ -404,7 +434,6 @@ private void loadProcessingFilesConfigDeprecated(String componentName) {
Topics currentProcessingComponentTopicsDeprecated = getRuntimeConfig()
.lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, componentName);


if (isDeprecatedVersionSupported()
&& currentProcessingComponentTopicsDeprecated != null
&& !currentProcessingComponentTopicsDeprecated.isEmpty()) {
Expand Down Expand Up @@ -519,41 +548,14 @@ private void handleCloudWatchAttemptStatus(CloudWatchAttempt cloudWatchAttempt)
completedFiles.forEach(file -> this.deleteFile(componentLogConfiguration, file));
});

// Update the runtime configuration and store the last processed file information
context.runOnPublishQueueAndWait(() -> {
processingFilesInformation.forEach((componentName, processingFiles) -> {
if (isDeprecatedVersionSupported()) {
// Update old config value to handle downgrade from v 2.3.1 to older ones
Topics componentTopicsDeprecated =
getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION,
componentName);

if (processingFiles.getMostRecentlyUsed() != null) {
updateFromMapWhenChanged(componentTopicsDeprecated,
processingFiles.getMostRecentlyUsed().convertToMapOfObjects(),
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE,
System.currentTimeMillis()));
}
}

// Handle version 2.3.1 and above

Topics componentTopics =
getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2,
componentName);

updateFromMapWhenChanged(componentTopics, processingFiles.toMap(),
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis()));
});
processingFilesInformation.forEach((componentName, processingFiles) -> {
processingFilesBuffer.put(componentName, processingFiles);
});
context.waitForPublishQueueToClear();

lastComponentUploadedLogFileInstantMap.forEach((componentName, instant) -> {
Topics componentTopics = getRuntimeConfig()
.lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName);
Topic lastFileProcessedTimeStamp = componentTopics.createLeafChild(PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP);
lastFileProcessedTimeStamp.withValue(instant.toEpochMilli());
lastUploadedFileBuffer.put(componentName, instant);
});

isCurrentlyUploading.set(false);
}

Expand Down Expand Up @@ -700,6 +702,7 @@ private void processLogsAndUpload() throws InterruptedException {
Instant.EPOCH);

try {
// list of component files that needs to be uploaded to cloud
LogFileGroup logFileGroup = LogFileGroup.create(componentLogConfiguration,
lastUploadedLogFileTimeMs, workDir);
if (Thread.currentThread().isInterrupted()) {
Expand Down Expand Up @@ -782,6 +785,46 @@ private void processLogsAndUpload() throws InterruptedException {
}
}

// Handler for flushing processing files updates
private void flushProcessingFilesUpdates(Map<String, ProcessingFiles> updates) {
context.runOnPublishQueueAndWait(() -> {
updates.forEach((componentName, processingFiles) -> {
if (isDeprecatedVersionSupported()) {
// Update old config value to handle downgrade from v 2.3.1 to older ones
Topics componentTopicsDeprecated =
getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION,
componentName);

if (processingFiles.getMostRecentlyUsed() != null) {
updateFromMapWhenChanged(componentTopicsDeprecated,
processingFiles.getMostRecentlyUsed().convertToMapOfObjects(),
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE,
System.currentTimeMillis()));
}
}

// Handle version 2.3.1 and above
Topics componentTopics =
getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2,
componentName);

updateFromMapWhenChanged(componentTopics, processingFiles.toMap(),
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis()));
});
});
context.waitForPublishQueueToClear();
}

// Handler for flushing last uploaded file updates
private void flushLastUploadedFileUpdates(Map<String, Instant> updates) {
updates.forEach((componentName, instant) -> {
Topics componentTopics = getRuntimeConfig()
.lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName);
Topic lastFileProcessedTimeStamp = componentTopics.createLeafChild(PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP);
lastFileProcessedTimeStamp.withValue(instant.toEpochMilli());
});
}

private void sleepFractionalSeconds(double timeToSleep) throws InterruptedException {
TimeUnit.MICROSECONDS.sleep(Math.round(timeToSleep * TimeUnit.SECONDS.toMicros(1)));
}
Expand All @@ -805,6 +848,14 @@ public void startup() throws InterruptedException {
public void shutdown() throws InterruptedException {
super.shutdown();
isCurrentlyUploading.set(false);

// Shutdown the buffers (this will flush any pending updates)
if (processingFilesBuffer != null) {
processingFilesBuffer.shutdown();
}
if (lastUploadedFileBuffer != null) {
lastUploadedFileBuffer.shutdown();
}
}

@Builder
Expand Down
166 changes: 166 additions & 0 deletions src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.logmanager.util;

import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.Map;

/**
* Generic buffer that accumulates updates and flushes them periodically.
* This can be used for batching configuration updates or other operations
* that benefit from being grouped together.
*
* @param <K> Key type for the buffered items
* @param <V> Value type for the buffered items
*/
public class PeriodicBuffer<K, V> {
private static final Logger logger = LogManager.getLogger(PeriodicBuffer.class);

private final Map<K, V> buffer = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Consumer<Map<K, V>> flushHandler;
private final long intervalSeconds;
private final String bufferName;
private final Object bufferLock = new Object();

private ScheduledFuture<?> flushTask;
private volatile boolean isShutdown = false;

/**
* Creates a new periodic buffer.
*
* @param bufferName Name for logging purposes
* @param intervalSeconds How often to flush the buffer
* @param flushHandler Function to handle flushing buffered items
*/
public PeriodicBuffer(String bufferName, long intervalSeconds, Consumer<Map<K, V>> flushHandler) {
this.bufferName = bufferName;
this.intervalSeconds = intervalSeconds;
this.flushHandler = flushHandler;
}

/**
* Starts the periodic flushing.
*/
public void start() {
if (isShutdown) {
throw new IllegalStateException("Buffer has been shutdown");
}

flushTask = scheduler.scheduleAtFixedRate(
this::flush,
intervalSeconds,
intervalSeconds,
TimeUnit.SECONDS
);

logger.atInfo().log("Started periodic buffer '{}' with {}-second interval", bufferName, intervalSeconds);
}

/**
* Adds an item to the buffer.
*
* @param key The key
* @param value The value
*/
public void put(K key, V value) {
if (isShutdown) {
logger.atWarn().log("Attempted to add to shutdown buffer '{}'", bufferName);
return;
}

synchronized (bufferLock) {
buffer.put(key, value);
}
}

/**
* Removes an item from the buffer.
*
* @param key The key to remove
*/
public void remove(K key) {
synchronized (bufferLock) {
buffer.remove(key);
}
}

/**
* Manually triggers a flush of the buffer.
*/
public void flush() {
synchronized (bufferLock) {
if (buffer.isEmpty()) {
return;
}

logger.atInfo().log("Flushing buffer '{}' with {} items", bufferName, buffer.size());

try {
// Create a copy to pass to the handler
Map<K, V> bufferCopy = new ConcurrentHashMap<>(buffer);
flushHandler.accept(bufferCopy);

// Clear the buffer after successful flush
buffer.clear();

logger.atDebug().log("Successfully flushed buffer '{}'", bufferName);
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.

It appears that your code handles a broad swath of exceptions in the catch block, potentially trapping dissimilar issues or problems that should not be dealt with at this point in the program.

Learn more

logger.atError().cause(e).log("Failed to flush buffer '{}'", bufferName);
}
}
}

/**
* Returns the current size of the buffer.
*/
public int size() {
synchronized (bufferLock) {
return buffer.size();
}
}

/**
* Checks if the buffer is empty.
*/
public boolean isEmpty() {
synchronized (bufferLock) {
return buffer.isEmpty();
}
}

/**
* Shuts down the buffer, flushing any remaining items.
*/
public void shutdown() {
if (isShutdown) {
return;
}

isShutdown = true;

// Cancel the scheduled task
if (flushTask != null) {
flushTask.cancel(false);
}

// Flush any remaining items
flush();

// Shutdown the scheduler
scheduler.shutdown();

logger.atInfo().log("Shutdown buffer '{}'", bufferName);
}
}
Loading