From 0ebf0f3fc3815d86345e663a00161c1546ac75a1 Mon Sep 17 00:00:00 2001 From: yitingb <118219519+yitingb@users.noreply.github.com> Date: Tue, 17 Jun 2025 10:19:03 -0700 Subject: [PATCH 1/5] feat(log-manager): add configurable buffer interval for log processing state --- .../logmanager/LogManagerService.java | 119 +++++++++---- .../logmanager/util/PeriodicBuffer.java | 166 ++++++++++++++++++ 2 files changed, 251 insertions(+), 34 deletions(-) create mode 100644 src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java diff --git a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java index 372da17d..23854315 100644 --- a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java +++ b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java @@ -28,6 +28,7 @@ import com.aws.greengrass.logmanager.model.LogFileInformation; import com.aws.greengrass.logmanager.model.ProcessingFiles; import com.aws.greengrass.logmanager.services.DiskSpaceManagementService; +import com.aws.greengrass.logmanager.util.PeriodicBuffer; import com.aws.greengrass.util.Coerce; import com.aws.greengrass.util.NucleusPaths; import com.aws.greengrass.util.Utils; @@ -104,6 +105,7 @@ public class LogManagerService extends PluginService { public static final String DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME = "deleteLogFileAfterCloudUpload"; public static final String UPLOAD_TO_CW_CONFIG_TOPIC_NAME = "uploadToCloudWatch"; public static final String MULTILINE_PATTERN_CONFIG_TOPIC_NAME = "multiLineStartPattern"; + public static final String BUFFER_INTERVAL_SEC = "bufferIntervalSec"; public static final double DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC = 300; public static final int MAX_CACHE_INACTIVE_TIME_SECONDS = 60 * 60 * 24; // 1 day @@ -117,6 +119,9 @@ public class LogManagerService extends PluginService { new ConcurrentHashMap<>(); private final DiskSpaceManagementService diskSpaceManagementService; + private PeriodicBuffer processingFilesBuffer; + private PeriodicBuffer lastUploadedFileBuffer; + @Getter Map componentLogConfigurations = new ConcurrentHashMap<>(); @@ -143,7 +148,6 @@ public class LogManagerService extends PluginService { this.logsProcessor = logProcessor; this.diskSpaceManagementService = new DiskSpaceManagementService(); this.workDir = nucleusPaths.workPath(LOGS_UPLOADER_SERVICE_TOPICS); - topics.lookupTopics(CONFIGURATION_CONFIG_KEY).subscribe((why, newv) -> { if (why == WhatHappened.timestampUpdated) { return; @@ -152,8 +156,34 @@ public class LogManagerService extends PluginService { handlePeriodicUploadIntervalSecConfig(topics); handleLogsUploaderConfig(topics); }); - this.uploader.registerAttemptStatus(LOGS_UPLOADER_SERVICE_TOPICS, this::handleCloudWatchAttemptStatus); + initializeConfigUpdateBuffers(topics); + } + + private void initializeConfigUpdateBuffers(Topics topics) { + long bufferIntervalSeconds = + Coerce.toLong(topics.findOrDefault(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, + CONFIGURATION_CONFIG_KEY, BUFFER_INTERVAL_SEC)); + + // Ensure buffer interval is at least as large as periodicUpdateIntervalSec + bufferIntervalSeconds = Math.max(bufferIntervalSeconds, (long) periodicUpdateIntervalSec); + + // Buffer for processing files information + processingFilesBuffer = new PeriodicBuffer<>( + "ProcessingFilesBuffer", + bufferIntervalSeconds, + this::flushProcessingFilesUpdates + ); + + // Buffer for last uploaded file timestamps + lastUploadedFileBuffer = new PeriodicBuffer<>( + "LastUploadedFileBuffer", + bufferIntervalSeconds, + this::flushLastUploadedFileUpdates + ); + + processingFilesBuffer.start(); + lastUploadedFileBuffer.start(); } private void handlePeriodicUploadIntervalSecConfig(Topics topics) { @@ -404,7 +434,6 @@ private void loadProcessingFilesConfigDeprecated(String componentName) { Topics currentProcessingComponentTopicsDeprecated = getRuntimeConfig() .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, componentName); - if (isDeprecatedVersionSupported() && currentProcessingComponentTopicsDeprecated != null && !currentProcessingComponentTopicsDeprecated.isEmpty()) { @@ -519,41 +548,14 @@ private void handleCloudWatchAttemptStatus(CloudWatchAttempt cloudWatchAttempt) completedFiles.forEach(file -> this.deleteFile(componentLogConfiguration, file)); }); - // Update the runtime configuration and store the last processed file information - context.runOnPublishQueueAndWait(() -> { - processingFilesInformation.forEach((componentName, processingFiles) -> { - if (isDeprecatedVersionSupported()) { - // Update old config value to handle downgrade from v 2.3.1 to older ones - Topics componentTopicsDeprecated = - getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, - componentName); - - if (processingFiles.getMostRecentlyUsed() != null) { - updateFromMapWhenChanged(componentTopicsDeprecated, - processingFiles.getMostRecentlyUsed().convertToMapOfObjects(), - new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, - System.currentTimeMillis())); - } - } - - // Handle version 2.3.1 and above - - Topics componentTopics = - getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, - componentName); - - updateFromMapWhenChanged(componentTopics, processingFiles.toMap(), - new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis())); - }); + processingFilesInformation.forEach((componentName, processingFiles) -> { + processingFilesBuffer.put(componentName, processingFiles); }); - context.waitForPublishQueueToClear(); lastComponentUploadedLogFileInstantMap.forEach((componentName, instant) -> { - Topics componentTopics = getRuntimeConfig() - .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName); - Topic lastFileProcessedTimeStamp = componentTopics.createLeafChild(PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP); - lastFileProcessedTimeStamp.withValue(instant.toEpochMilli()); + lastUploadedFileBuffer.put(componentName, instant); }); + isCurrentlyUploading.set(false); } @@ -700,6 +702,7 @@ private void processLogsAndUpload() throws InterruptedException { Instant.EPOCH); try { + // list of component files that needs to be uploaded to cloud LogFileGroup logFileGroup = LogFileGroup.create(componentLogConfiguration, lastUploadedLogFileTimeMs, workDir); if (Thread.currentThread().isInterrupted()) { @@ -782,6 +785,46 @@ private void processLogsAndUpload() throws InterruptedException { } } + // Handler for flushing processing files updates + private void flushProcessingFilesUpdates(Map updates) { + context.runOnPublishQueueAndWait(() -> { + updates.forEach((componentName, processingFiles) -> { + if (isDeprecatedVersionSupported()) { + // Update old config value to handle downgrade from v 2.3.1 to older ones + Topics componentTopicsDeprecated = + getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION, + componentName); + + if (processingFiles.getMostRecentlyUsed() != null) { + updateFromMapWhenChanged(componentTopicsDeprecated, + processingFiles.getMostRecentlyUsed().convertToMapOfObjects(), + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, + System.currentTimeMillis())); + } + } + + // Handle version 2.3.1 and above + Topics componentTopics = + getRuntimeConfig().lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, + componentName); + + updateFromMapWhenChanged(componentTopics, processingFiles.toMap(), + new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis())); + }); + }); + context.waitForPublishQueueToClear(); + } + + // Handler for flushing last uploaded file updates + private void flushLastUploadedFileUpdates(Map updates) { + updates.forEach((componentName, instant) -> { + Topics componentTopics = getRuntimeConfig() + .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName); + Topic lastFileProcessedTimeStamp = componentTopics.createLeafChild(PERSISTED_LAST_FILE_PROCESSED_TIMESTAMP); + lastFileProcessedTimeStamp.withValue(instant.toEpochMilli()); + }); + } + private void sleepFractionalSeconds(double timeToSleep) throws InterruptedException { TimeUnit.MICROSECONDS.sleep(Math.round(timeToSleep * TimeUnit.SECONDS.toMicros(1))); } @@ -805,6 +848,14 @@ public void startup() throws InterruptedException { public void shutdown() throws InterruptedException { super.shutdown(); isCurrentlyUploading.set(false); + + // Shutdown the buffers (this will flush any pending updates) + if (processingFilesBuffer != null) { + processingFilesBuffer.shutdown(); + } + if (lastUploadedFileBuffer != null) { + lastUploadedFileBuffer.shutdown(); + } } @Builder diff --git a/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java b/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java new file mode 100644 index 00000000..f73483bd --- /dev/null +++ b/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java @@ -0,0 +1,166 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.logmanager.util; + +import com.aws.greengrass.logging.api.Logger; +import com.aws.greengrass.logging.impl.LogManager; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.Map; + +/** + * Generic buffer that accumulates updates and flushes them periodically. + * This can be used for batching configuration updates or other operations + * that benefit from being grouped together. + * + * @param Key type for the buffered items + * @param Value type for the buffered items + */ +public class PeriodicBuffer { + private static final Logger logger = LogManager.getLogger(PeriodicBuffer.class); + + private final Map buffer = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final Consumer> flushHandler; + private final long intervalSeconds; + private final String bufferName; + private final Object bufferLock = new Object(); + + private ScheduledFuture flushTask; + private volatile boolean isShutdown = false; + + /** + * Creates a new periodic buffer. + * + * @param bufferName Name for logging purposes + * @param intervalSeconds How often to flush the buffer + * @param flushHandler Function to handle flushing buffered items + */ + public PeriodicBuffer(String bufferName, long intervalSeconds, Consumer> flushHandler) { + this.bufferName = bufferName; + this.intervalSeconds = intervalSeconds; + this.flushHandler = flushHandler; + } + + /** + * Starts the periodic flushing. + */ + public void start() { + if (isShutdown) { + throw new IllegalStateException("Buffer has been shutdown"); + } + + flushTask = scheduler.scheduleAtFixedRate( + this::flush, + intervalSeconds, + intervalSeconds, + TimeUnit.SECONDS + ); + + logger.atInfo().log("Started periodic buffer '{}' with {}-second interval", bufferName, intervalSeconds); + } + + /** + * Adds an item to the buffer. + * + * @param key The key + * @param value The value + */ + public void put(K key, V value) { + if (isShutdown) { + logger.atWarn().log("Attempted to add to shutdown buffer '{}'", bufferName); + return; + } + + synchronized (bufferLock) { + buffer.put(key, value); + } + } + + /** + * Removes an item from the buffer. + * + * @param key The key to remove + */ + public void remove(K key) { + synchronized (bufferLock) { + buffer.remove(key); + } + } + + /** + * Manually triggers a flush of the buffer. + */ + public void flush() { + synchronized (bufferLock) { + if (buffer.isEmpty()) { + return; + } + + logger.atInfo().log("Flushing buffer '{}' with {} items", bufferName, buffer.size()); + + try { + // Create a copy to pass to the handler + Map bufferCopy = new ConcurrentHashMap<>(buffer); + flushHandler.accept(bufferCopy); + + // Clear the buffer after successful flush + buffer.clear(); + + logger.atDebug().log("Successfully flushed buffer '{}'", bufferName); + } catch (Exception e) { + logger.atError().cause(e).log("Failed to flush buffer '{}'", bufferName); + } + } + } + + /** + * Returns the current size of the buffer. + */ + public int size() { + synchronized (bufferLock) { + return buffer.size(); + } + } + + /** + * Checks if the buffer is empty. + */ + public boolean isEmpty() { + synchronized (bufferLock) { + return buffer.isEmpty(); + } + } + + /** + * Shuts down the buffer, flushing any remaining items. + */ + public void shutdown() { + if (isShutdown) { + return; + } + + isShutdown = true; + + // Cancel the scheduled task + if (flushTask != null) { + flushTask.cancel(false); + } + + // Flush any remaining items + flush(); + + // Shutdown the scheduler + scheduler.shutdown(); + + logger.atInfo().log("Shutdown buffer '{}'", bufferName); + } +} \ No newline at end of file From a70cce689b264cbed33aa7fe0c5b220bc799b2de Mon Sep 17 00:00:00 2001 From: yitingb <118219519+yitingb@users.noreply.github.com> Date: Wed, 18 Jun 2025 22:45:40 -0700 Subject: [PATCH 2/5] fix: checkstyle --- .../aws/greengrass/logmanager/util/PeriodicBuffer.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java b/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java index f73483bd..b58efa04 100644 --- a/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java +++ b/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java @@ -8,13 +8,13 @@ import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.LogManager; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.Map; /** * Generic buffer that accumulates updates and flushes them periodically. @@ -51,7 +51,10 @@ public PeriodicBuffer(String bufferName, long intervalSeconds, Consumer Date: Thu, 19 Jun 2025 11:17:23 -0700 Subject: [PATCH 3/5] fix: update deadlock --- .../logmanager/LogManagerService.java | 14 +- .../logmanager/util/PeriodicBuffer.java | 171 ++++++++++++++++-- 2 files changed, 163 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java index 23854315..9499b41f 100644 --- a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java +++ b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java @@ -107,7 +107,7 @@ public class LogManagerService extends PluginService { public static final String MULTILINE_PATTERN_CONFIG_TOPIC_NAME = "multiLineStartPattern"; public static final String BUFFER_INTERVAL_SEC = "bufferIntervalSec"; public static final double DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC = 300; - public static final int MAX_CACHE_INACTIVE_TIME_SECONDS = 60 * 60 * 24; // 1 day + public static final int MAX_CACHE_INACTIVE_TIME_SECONDS = (int) TimeUnit.DAYS.toSeconds(1); private final List> serviceStatusListeners = new ArrayList<>(); @@ -162,12 +162,14 @@ public class LogManagerService extends PluginService { private void initializeConfigUpdateBuffers(Topics topics) { long bufferIntervalSeconds = - Coerce.toLong(topics.findOrDefault(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, - CONFIGURATION_CONFIG_KEY, BUFFER_INTERVAL_SEC)); + Coerce.toLong(topics.findOrDefault( + LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, CONFIGURATION_CONFIG_KEY, BUFFER_INTERVAL_SEC)); // Ensure buffer interval is at least as large as periodicUpdateIntervalSec bufferIntervalSeconds = Math.max(bufferIntervalSeconds, (long) periodicUpdateIntervalSec); + logger.atInfo().log("Initializing config update buffers with interval: {} seconds", bufferIntervalSeconds); + // Buffer for processing files information processingFilesBuffer = new PeriodicBuffer<>( "ProcessingFilesBuffer", @@ -528,7 +530,7 @@ private void handleCloudWatchAttemptStatus(CloudWatchAttempt cloudWatchAttempt) // Record the last processed file timestamp completedFiles.forEach(file -> { - updatelastComponentUploadedLogFile(lastComponentUploadedLogFileInstantMap, componentName, file); + updateLastComponentUploadedLogFile(lastComponentUploadedLogFileInstantMap, componentName, file); }); if (!componentLogConfigurations.containsKey(componentName)) { @@ -659,7 +661,7 @@ private void processCloudWatchAttemptLogInformation(Map> co * @param componentName componentName. * @param logFile the logFile that is going to be recorded. */ - private void updatelastComponentUploadedLogFile(Map lastComponentUploadedLogFileInstantMap, + private void updateLastComponentUploadedLogFile(Map lastComponentUploadedLogFileInstantMap, String componentName, LogFile logFile) { if (!lastComponentUploadedLogFileInstantMap.containsKey(componentName) @@ -747,7 +749,7 @@ private void processLogsAndUpload() throws InterruptedException { if (startPosition < file.length()) { logFileInfo.getLogFileInformationList().add(logFileInformation); } else if (startPosition == file.length() && !logFileGroup.isActiveFile(file)) { - updatelastComponentUploadedLogFile(lastComponentUploadedLogFileInstantMap, + updateLastComponentUploadedLogFile(lastComponentUploadedLogFileInstantMap, componentName, file); // NOTE: This handles the scenario where we are uploading the active file constantly and diff --git a/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java b/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java index b58efa04..328a6c0b 100644 --- a/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java +++ b/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java @@ -11,9 +11,13 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; /** @@ -27,12 +31,15 @@ public class PeriodicBuffer { private static final Logger logger = LogManager.getLogger(PeriodicBuffer.class); - private final Map buffer = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private Map buffer = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler; private final Consumer> flushHandler; private final long intervalSeconds; private final String bufferName; private final Object bufferLock = new Object(); + private final AtomicLong flushSuccessCount = new AtomicLong(0); + private final AtomicLong flushFailureCount = new AtomicLong(0); + private final AtomicInteger flushInProgress = new AtomicInteger(0); private ScheduledFuture flushTask; private volatile boolean isShutdown = false; @@ -48,6 +55,15 @@ public PeriodicBuffer(String bufferName, long intervalSeconds, Consumer { + Thread thread = new Thread(runnable, "PeriodicBuffer-" + bufferName + "-Flusher"); + thread.setDaemon(true); + return thread; + }; + + this.scheduler = Executors.newSingleThreadScheduledExecutor(namedThreadFactory); } /** @@ -85,6 +101,8 @@ public void put(K key, V value) { synchronized (bufferLock) { buffer.put(key, value); + logger.atTrace().kv("key", key).kv("bufferSize", buffer.size()) + .log("Added item to buffer '{}'", bufferName); } } @@ -96,6 +114,8 @@ public void put(K key, V value) { public void remove(K key) { synchronized (bufferLock) { buffer.remove(key); + logger.atTrace().kv("key", key).kv("bufferSize", buffer.size()) + .log("Removed item from buffer '{}'", bufferName); } } @@ -103,25 +123,109 @@ public void remove(K key) { * Manually triggers a flush of the buffer. */ public void flush() { + flush(false); + } + + /** + * Triggers a flush of the buffer with option to wait for any in-progress flush. + * + * @param waitForInProgress if true, waits for any in-progress flush to complete before proceeding + */ + @SuppressWarnings("PMD.AvoidCatchingGenericException") + private void flush(boolean waitForInProgress) { + // Early check for empty buffer to avoid unnecessary locking synchronized (bufferLock) { if (buffer.isEmpty()) { + logger.atDebug().log("Buffer '{}' is empty, nothing to flush", bufferName); return; } + } - logger.atInfo().log("Flushing buffer '{}' with {} items", bufferName, buffer.size()); + // Check if another flush is already in progress + if (!flushInProgress.compareAndSet(0, 1)) { + if (waitForInProgress) { + logger.atDebug().log("Flush already in progress for buffer '{}', waiting for completion", bufferName); + // Wait for the current flush to complete + while (flushInProgress.get() != 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.atWarn().log("Interrupted while waiting for flush to complete in buffer '{}'", + bufferName); + return; + } + } + // Try again after the previous flush completed + flush(waitForInProgress); + } else { + logger.atDebug().log("Flush already in progress for buffer '{}', skipping", bufferName); + } + return; + } - try { - // Create a copy to pass to the handler - Map bufferCopy = new ConcurrentHashMap<>(buffer); - flushHandler.accept(bufferCopy); + try { + Map bufferToFlush; + boolean shouldFlush; + synchronized (bufferLock) { + // Double-check if buffer is still non-empty + if (buffer.isEmpty()) { + logger.atDebug().log("Buffer '{}' became empty, nothing to flush", bufferName); + // Don't return here - we need to go through finally block + bufferToFlush = new ConcurrentHashMap<>(); + shouldFlush = false; + } else { + logger.atInfo().log("Flushing buffer '{}' with {} items", bufferName, buffer.size()); - // Clear the buffer after successful flush - buffer.clear(); + // Swap the buffer instead of copying for better performance + bufferToFlush = buffer; + buffer = new ConcurrentHashMap<>(); + shouldFlush = true; + } + } - logger.atDebug().log("Successfully flushed buffer '{}'", bufferName); - } catch (Exception e) { - logger.atError().cause(e).log("Failed to flush buffer '{}'", bufferName); + // Only proceed with flush if we have data + if (shouldFlush) { + long startTime = System.currentTimeMillis(); + try { + flushHandler.accept(bufferToFlush); + + long duration = System.currentTimeMillis() - startTime; + flushSuccessCount.incrementAndGet(); + + logger.atDebug() + .kv("itemCount", bufferToFlush.size()) + .kv("durationMs", duration) + .kv("successCount", flushSuccessCount.get()) + .log("Successfully flushed buffer '{}'", bufferName); + + } catch (RejectedExecutionException e) { + // Put the items back if flush was rejected + synchronized (bufferLock) { + bufferToFlush.forEach(buffer::putIfAbsent); + } + flushFailureCount.incrementAndGet(); + logger.atError().cause(e) + .kv("failureCount", flushFailureCount.get()) + .log("Flush rejected for buffer '{}', items restored", bufferName); + } catch (RuntimeException e) { + // Catching RuntimeException because the flushHandler is a Consumer> that + // can throw any unchecked exception. We need to handle all possible runtime exceptions + // to ensure the buffer continues to function properly. + // CHECKSTYLE:OFF - We need to catch all runtime exceptions from user-provided handler + // PMD:OFF:AvoidCatchingGenericException - The flushHandler is a user-provided Consumer + // that can throw any unchecked exception, and we must ensure the buffer continues to + // function even if the handler fails. This is a valid use case for catching RuntimeException. + flushFailureCount.incrementAndGet(); + logger.atError().cause(e) + .kv("failureCount", flushFailureCount.get()) + .log("Failed to flush buffer '{}'. Items lost.", bufferName); + // CHECKSTYLE:ON + // PMD:ON:AvoidCatchingGenericException + } } + } finally { + flushInProgress.set(0); } } @@ -146,24 +250,59 @@ public boolean isEmpty() { /** * Shuts down the buffer, flushing any remaining items. */ + @SuppressWarnings("PMD.AvoidCatchingGenericException") public void shutdown() { if (isShutdown) { return; } isShutdown = true; + logger.atInfo().log("Shutting down buffer '{}'", bufferName); // Cancel the scheduled task if (flushTask != null) { flushTask.cancel(false); } - // Flush any remaining items - flush(); + // Flush any remaining items, waiting for any in-progress flush to complete + try { + flush(true); + } catch (RuntimeException e) { + // Catching RuntimeException because the flush method can throw any unchecked exception + // from the flushHandler. We need to ensure shutdown completes even if the final flush fails. + // CHECKSTYLE:OFF - We need to catch all runtime exceptions from user-provided handler + // PMD:OFF:AvoidCatchingGenericException - The flushHandler called via flush(true) can throw + // any unchecked exception. We must ensure shutdown completes successfully even if the final + // flush operation fails. This is a valid use case for catching RuntimeException. + logger.atError().cause(e).log("Error during final flush"); + // CHECKSTYLE:ON + // PMD:ON:AvoidCatchingGenericException + } // Shutdown the scheduler - scheduler.shutdown(); + scheduler.shutdownNow(); - logger.atInfo().log("Shutdown buffer '{}'", bufferName); + logger.atInfo() + .kv("successfulFlushes", flushSuccessCount.get()) + .kv("failedFlushes", flushFailureCount.get()) + .log("Shutdown complete for buffer '{}'", bufferName); + } + + /** + * Gets the number of successful flushes. + * + * @return the number of successful flushes + */ + public long getFlushSuccessCount() { + return flushSuccessCount.get(); + } + + /** + * Gets the number of failed flushes. + * + * @return the number of failed flushes + */ + public long getFlushFailureCount() { + return flushFailureCount.get(); } } From 26af2ef1e14b0a5d3dec79957df724a27830f0ee Mon Sep 17 00:00:00 2001 From: yitingb <118219519+yitingb@users.noreply.github.com> Date: Thu, 19 Jun 2025 11:24:14 -0700 Subject: [PATCH 4/5] fix: add unit tests --- .../logmanager/util/PeriodicBufferTest.java | 594 ++++++++++++++++++ 1 file changed, 594 insertions(+) create mode 100644 src/test/java/com/aws/greengrass/logmanager/util/PeriodicBufferTest.java diff --git a/src/test/java/com/aws/greengrass/logmanager/util/PeriodicBufferTest.java b/src/test/java/com/aws/greengrass/logmanager/util/PeriodicBufferTest.java new file mode 100644 index 00000000..9836e5d2 --- /dev/null +++ b/src/test/java/com/aws/greengrass/logmanager/util/PeriodicBufferTest.java @@ -0,0 +1,594 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.logmanager.util; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +class PeriodicBufferTest { + + private PeriodicBuffer buffer; + private final List> flushedData = Collections.synchronizedList(new ArrayList<>()); + private final AtomicInteger flushCount = new AtomicInteger(0); + private final AtomicBoolean flushHandlerThrowsException = new AtomicBoolean(false); + private final AtomicBoolean flushHandlerThrowsRejectedExecutionException = new AtomicBoolean(false); + private Consumer> defaultFlushHandler; + + @BeforeEach + void setUp() { + flushedData.clear(); + flushCount.set(0); + flushHandlerThrowsException.set(false); + flushHandlerThrowsRejectedExecutionException.set(false); + + defaultFlushHandler = data -> { + if (flushHandlerThrowsRejectedExecutionException.get()) { + throw new RejectedExecutionException("Test rejected execution"); + } + if (flushHandlerThrowsException.get()) { + throw new RuntimeException("Test exception"); + } + flushedData.add(new ConcurrentHashMap<>(data)); + flushCount.incrementAndGet(); + }; + } + + @AfterEach + void shutDown() { + if (buffer != null) { + buffer.shutdown(); + } + } + + @Test + void testPutAndSize() { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + assertEquals(0, buffer.size()); + assertTrue(buffer.isEmpty()); + + buffer.put("key1", "value1"); + assertEquals(1, buffer.size()); + assertFalse(buffer.isEmpty()); + + buffer.put("key2", "value2"); + assertEquals(2, buffer.size()); + + buffer.put("key1", "updatedValue1"); + assertEquals(2, buffer.size()); // Size should remain same after update + } + + @Test + void testRemove() { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + buffer.put("key2", "value2"); + assertEquals(2, buffer.size()); + + buffer.remove("key1"); + assertEquals(1, buffer.size()); + + buffer.remove("nonExistentKey"); + assertEquals(1, buffer.size()); + + buffer.remove("key2"); + assertEquals(0, buffer.size()); + assertTrue(buffer.isEmpty()); + } + + @Test + void testManualFlush() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + buffer.put("key2", "value2"); + + buffer.flush(); + + // Wait for flush to complete + Thread.sleep(100); + + assertEquals(1, flushCount.get()); + assertEquals(1, flushedData.size()); + assertEquals(2, flushedData.get(0).size()); + assertEquals("value1", flushedData.get(0).get("key1")); + assertEquals("value2", flushedData.get(0).get("key2")); + + // Buffer should be empty after flush + assertEquals(0, buffer.size()); + } + + @Test + void testFlushEmptyBuffer() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + buffer.flush(); + + Thread.sleep(100); + + // Flush handler should not be called for empty buffer + assertEquals(0, flushCount.get()); + assertEquals(0, flushedData.size()); + } + + @Test + @Timeout(5) + void testPeriodicFlush() throws InterruptedException { + CountDownLatch flushLatch = new CountDownLatch(1); + Consumer> latchFlushHandler = data -> { + defaultFlushHandler.accept(data); + flushLatch.countDown(); + }; + + buffer = new PeriodicBuffer<>("TestBuffer", 1, latchFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + + // Wait for periodic flush (1 second) + assertTrue(flushLatch.await(2, TimeUnit.SECONDS)); + + assertEquals(1, flushCount.get()); + assertEquals(1, flushedData.size()); + assertEquals("value1", flushedData.get(0).get("key1")); + } + + @Test + void testFlushHandlerRuntimeException() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + flushHandlerThrowsException.set(true); + + buffer.flush(); + Thread.sleep(100); + + // Items should be lost after runtime exception + assertEquals(0, buffer.size()); + assertEquals(1, buffer.getFlushFailureCount()); + assertEquals(0, buffer.getFlushSuccessCount()); + } + + @Test + void testFlushHandlerRejectedExecutionException() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + buffer.put("key2", "value2"); + flushHandlerThrowsRejectedExecutionException.set(true); + + buffer.flush(); + Thread.sleep(100); + + // Items should be restored after RejectedExecutionException + assertEquals(2, buffer.size()); + assertEquals(1, buffer.getFlushFailureCount()); + assertEquals(0, buffer.getFlushSuccessCount()); + + // Verify items can be flushed successfully later + flushHandlerThrowsRejectedExecutionException.set(false); + buffer.flush(); + Thread.sleep(100); + + assertEquals(0, buffer.size()); + assertEquals(1, buffer.getFlushSuccessCount()); + } + + @Test + void testShutdownFlushesRemainingItems() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + buffer.put("key2", "value2"); + + buffer.shutdown(); + + // Shutdown should flush remaining items + assertEquals(1, flushCount.get()); + assertEquals(1, flushedData.size()); + assertEquals(2, flushedData.get(0).size()); + } + + @Test + void testShutdownPreventsNewItems() { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + buffer.shutdown(); + + // Should not throw but should not add item either + buffer.put("key1", "value1"); + + // Manual flush after shutdown should not do anything + buffer.flush(); + + assertEquals(0, flushCount.get()); + } + + @Test + void testMultipleShutdownCalls() { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + + buffer.shutdown(); + int firstFlushCount = flushCount.get(); + + // Second shutdown should be idempotent + buffer.shutdown(); + assertEquals(firstFlushCount, flushCount.get()); + } + + @Test + void testStartAfterShutdown() { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + buffer.shutdown(); + + assertThrows(IllegalStateException.class, () -> buffer.start()); + } + + @Test + void testConcurrentPutOperations() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + int threadCount = 10; + int itemsPerThread = 100; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch completeLatch = new CountDownLatch(threadCount); + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + executor.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < itemsPerThread; j++) { + buffer.put("thread" + threadId + "-key" + j, "value" + j); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + completeLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + + buffer.flush(); + Thread.sleep(100); + + // All items should be flushed + assertEquals(1, flushCount.get()); + assertEquals(threadCount * itemsPerThread, flushedData.get(0).size()); + + executor.shutdown(); + } + + @Test + void testConcurrentPutAndRemove() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + int operationCount = 1000; + CountDownLatch completeLatch = new CountDownLatch(2); + + Thread putThread = new Thread(() -> { + for (int i = 0; i < operationCount; i++) { + buffer.put("key" + (i % 10), "value" + i); + } + completeLatch.countDown(); + }); + + Thread removeThread = new Thread(() -> { + for (int i = 0; i < operationCount; i++) { + buffer.remove("key" + (i % 10)); + } + completeLatch.countDown(); + }); + + putThread.start(); + removeThread.start(); + + assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + + // Size should be small due to constant put/remove on same keys + assertTrue(buffer.size() <= 10); + } + + @Test + void testFlushDuringConcurrentModification() throws InterruptedException { + AtomicReference flushException = new AtomicReference<>(); + Consumer> safeFlushHandler = data -> { + try { + // Simulate processing time + Thread.sleep(50); + defaultFlushHandler.accept(data); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + flushException.set(e); + } + }; + + buffer = new PeriodicBuffer<>("TestBuffer", 60, safeFlushHandler); + buffer.start(); + + // Pre-populate buffer + for (int i = 0; i < 100; i++) { + buffer.put("key" + i, "value" + i); + } + + CountDownLatch flushStarted = new CountDownLatch(1); + CountDownLatch modificationComplete = new CountDownLatch(1); + + Thread flushThread = new Thread(() -> { + flushStarted.countDown(); + buffer.flush(); + }); + + Thread modificationThread = new Thread(() -> { + try { + flushStarted.await(); + // Try to modify buffer during flush + for (int i = 100; i < 200; i++) { + buffer.put("key" + i, "value" + i); + if (i % 10 == 0) { + buffer.remove("key" + (i - 50)); + } + } + modificationComplete.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + flushThread.start(); + modificationThread.start(); + + flushThread.join(5000); + assertTrue(modificationComplete.await(5, TimeUnit.SECONDS)); + + // Verify no exceptions occurred + if (flushException.get() != null) { + fail("Exception during flush: " + flushException.get()); + } + + // First flush should have the original 100 items + assertEquals(1, flushedData.size()); + assertEquals(100, flushedData.get(0).size()); + + // Buffer should contain new items added during flush + assertTrue(buffer.size() > 0); + } + + @Test + void testFlushStatistics() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + assertEquals(0, buffer.getFlushSuccessCount()); + assertEquals(0, buffer.getFlushFailureCount()); + + // Successful flush + buffer.put("key1", "value1"); + buffer.flush(); + Thread.sleep(100); + assertEquals(1, buffer.getFlushSuccessCount()); + assertEquals(0, buffer.getFlushFailureCount()); + + // Failed flush + buffer.put("key2", "value2"); + flushHandlerThrowsException.set(true); + buffer.flush(); + Thread.sleep(100); + assertEquals(1, buffer.getFlushSuccessCount()); + assertEquals(1, buffer.getFlushFailureCount()); + + // Another successful flush + flushHandlerThrowsException.set(false); + buffer.put("key3", "value3"); + buffer.flush(); + Thread.sleep(100); + assertEquals(2, buffer.getFlushSuccessCount()); + assertEquals(1, buffer.getFlushFailureCount()); + } + + @Test + void testConcurrentFlushSkipping() throws InterruptedException { + CountDownLatch flushInProgress = new CountDownLatch(1); + CountDownLatch secondFlushAttempted = new CountDownLatch(1); + AtomicInteger actualFlushCount = new AtomicInteger(0); + + Consumer> slowFlushHandler = data -> { + try { + actualFlushCount.incrementAndGet(); + flushInProgress.countDown(); + Thread.sleep(500); // Simulate slow flush + defaultFlushHandler.accept(data); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + buffer = new PeriodicBuffer<>("TestBuffer", 60, slowFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + + // Start first flush in separate thread + Thread firstFlush = new Thread(() -> buffer.flush()); + firstFlush.start(); + + // Wait for first flush to start + assertTrue(flushInProgress.await(1, TimeUnit.SECONDS)); + + // Try second flush while first is in progress + Thread secondFlush = new Thread(() -> { + buffer.flush(); + secondFlushAttempted.countDown(); + }); + secondFlush.start(); + + assertTrue(secondFlushAttempted.await(1, TimeUnit.SECONDS)); + firstFlush.join(2000); + secondFlush.join(2000); + + // Only one actual flush should have occurred + assertEquals(1, actualFlushCount.get()); + assertEquals(1, flushCount.get()); + } + + @Test + void testBufferSwapping() throws InterruptedException { + CountDownLatch flushStarted = new CountDownLatch(1); + CountDownLatch continueFlush = new CountDownLatch(1); + AtomicReference> capturedBuffer = new AtomicReference<>(); + + Consumer> capturingFlushHandler = data -> { + capturedBuffer.set(new ConcurrentHashMap<>(data)); // Make a copy to avoid reference issues + flushStarted.countDown(); // Signal that flush has started and buffer is captured + try { + // Wait for test thread to add new items + assertTrue(continueFlush.await(2, TimeUnit.SECONDS)); + Thread.sleep(50); // Brief processing time + defaultFlushHandler.accept(data); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during flush", e); + } + }; + + buffer = new PeriodicBuffer<>("TestBuffer", 60, capturingFlushHandler); + buffer.start(); + + buffer.put("key1", "value1"); + + Thread flushThread = new Thread(() -> buffer.flush()); + flushThread.start(); + + // Wait for flush to start and capture the buffer + assertTrue(flushStarted.await(2, TimeUnit.SECONDS)); + + // Add new items while flush is processing old buffer + buffer.put("key2", "value2"); + assertEquals(1, buffer.size()); // New buffer should have the new item + + // Allow flush to continue + continueFlush.countDown(); + + flushThread.join(3000); + + // Verify the flushed data only contains the old item + assertNotNull(capturedBuffer.get()); + assertEquals(1, capturedBuffer.get().size()); + assertTrue(capturedBuffer.get().containsKey("key1")); + assertFalse(capturedBuffer.get().containsKey("key2")); + + // Verify buffer still contains the new item + assertEquals(1, buffer.size()); + } + + @Test + @Timeout(5) + void testShutdownWithEmptyBufferDoesNotDeadlock() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + // First, trigger a flush on an empty buffer to simulate the bug + buffer.flush(); + Thread.sleep(100); + + // Now try to shutdown - this should not hang + buffer.shutdown(); + + // If we reach here, shutdown completed successfully + // The test passing without timeout is the assertion + } + + @Test + @Timeout(5) + void testConcurrentEmptyFlushAndShutdown() throws InterruptedException { + buffer = new PeriodicBuffer<>("TestBuffer", 60, defaultFlushHandler); + buffer.start(); + + // Create multiple threads that will flush empty buffer + int threadCount = 5; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch flushLatch = new CountDownLatch(threadCount); + ExecutorService executor = Executors.newFixedThreadPool(threadCount + 1); + + // Start threads that will flush empty buffer + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + startLatch.await(); + buffer.flush(); + flushLatch.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + // Start shutdown thread + executor.submit(() -> { + try { + startLatch.await(); + Thread.sleep(200); // Let some flushes happen first + buffer.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Release all threads + startLatch.countDown(); + + // Wait for all flushes to complete + assertTrue(flushLatch.await(3, TimeUnit.SECONDS)); + + // Shutdown should also complete without hanging + executor.shutdown(); + assertTrue(executor.awaitTermination(3, TimeUnit.SECONDS)); + } +} From 9a6d4984079c804dff9ec91cea007a299ca76d86 Mon Sep 17 00:00:00 2001 From: yitingb <118219519+yitingb@users.noreply.github.com> Date: Mon, 23 Jun 2025 15:47:15 -0700 Subject: [PATCH 5/5] fix: change wait sleep to complete future --- ...gManagerPeriodicBufferIntegrationTest.java | 355 ++++++++++++++++++ ...llPeriodicIntervalUserComponentConfig.yaml | 1 + .../logmanager/util/PeriodicBuffer.java | 200 +++++----- .../logmanager/util/PeriodicBufferTest.java | 102 ++--- 4 files changed, 522 insertions(+), 136 deletions(-) create mode 100644 src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerPeriodicBufferIntegrationTest.java diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerPeriodicBufferIntegrationTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerPeriodicBufferIntegrationTest.java new file mode 100644 index 00000000..1396a2c6 --- /dev/null +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerPeriodicBufferIntegrationTest.java @@ -0,0 +1,355 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.integrationtests.logmanager; + +import com.aws.greengrass.config.Topics; +import com.aws.greengrass.dependency.State; +import com.aws.greengrass.deployment.DeviceConfiguration; +import com.aws.greengrass.deployment.exceptions.DeviceConfigurationException; +import com.aws.greengrass.integrationtests.BaseITCase; +import com.aws.greengrass.integrationtests.util.ConfigPlatformResolver; +import com.aws.greengrass.lifecyclemanager.Kernel; +import com.aws.greengrass.logmanager.LogManagerService; +import com.aws.greengrass.logmanager.model.ProcessingFiles; +import com.aws.greengrass.testcommons.testutilities.GGExtension; +import com.aws.greengrass.util.Coerce; +import com.aws.greengrass.util.exceptions.TLSAuthException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import java.time.format.DateTimeParseException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static com.aws.greengrass.integrationtests.logmanager.util.LogFileHelper.createTempFileAndWriteData; +import static com.aws.greengrass.logmanager.LogManagerService.BUFFER_INTERVAL_SEC; +import static com.aws.greengrass.logmanager.LogManagerService.LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC; +import static com.aws.greengrass.logmanager.LogManagerService.LOGS_UPLOADER_SERVICE_TOPICS; +import static com.aws.greengrass.logmanager.LogManagerService.PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2; +import static com.aws.greengrass.logmanager.LogManagerService.PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP; +import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@ExtendWith({GGExtension.class, MockitoExtension.class}) +class LogManagerPeriodicBufferIntegrationTest extends BaseITCase { + private static Kernel kernel; + private static DeviceConfiguration deviceConfiguration; + private LogManagerService logManagerService; + private Path tempDirectoryPath; + private final static ObjectMapper YAML_OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()); + + @Mock + private CloudWatchLogsClient cloudWatchLogsClient; + @Captor + private ArgumentCaptor captor; + + void setupKernel(Path storeDirectory, String configFileName) + throws InterruptedException, URISyntaxException, IOException, DeviceConfigurationException { + + System.setProperty("root", tempRootDir.toAbsolutePath().toString()); + CountDownLatch logManagerRunning = new CountDownLatch(1); + + Path testRecipePath = Paths.get(LogManagerTest.class.getResource(configFileName).toURI()); + String content = new String(Files.readAllBytes(testRecipePath), StandardCharsets.UTF_8); + content = content.replaceAll("\\{\\{logFileDirectoryPath}}", + storeDirectory.toAbsolutePath().toString()); + + Map objectMap = YAML_OBJECT_MAPPER.readValue(content, Map.class); + + kernel.parseArgs(); + kernel.getConfig().mergeMap(System.currentTimeMillis(), ConfigPlatformResolver.resolvePlatformMap(objectMap)); + + kernel.getContext().addGlobalStateChangeListener((service, oldState, newState) -> { + if (service.getName().equals(LOGS_UPLOADER_SERVICE_TOPICS) + && newState.equals(State.RUNNING)) { + logManagerService = (LogManagerService) service; + logManagerService.getUploader().getCloudWatchWrapper().setClient(cloudWatchLogsClient); + logManagerRunning.countDown(); + } + }); + deviceConfiguration = new DeviceConfiguration(kernel, "ThingName", "xxxxxx-ats.iot.us-east-1.amazonaws.com", + "xxxxxx.credentials.iot.us-east-1.amazonaws.com", "privKeyFilePath", + "certFilePath", "caFilePath", "us-east-1", "roleAliasName"); + + kernel.getContext().put(DeviceConfiguration.class, deviceConfiguration); + kernel.launch(); + assertTrue(logManagerRunning.await(30, TimeUnit.SECONDS)); + } + + @BeforeEach + void beforeEach(ExtensionContext context) { + kernel = new Kernel(); + ignoreExceptionOfType(context, TLSAuthException.class); + ignoreExceptionOfType(context, InterruptedException.class); + ignoreExceptionOfType(context, DateTimeParseException.class); + ignoreExceptionOfType(context, CrtRuntimeException.class); + } + + @AfterEach + void afterEach() { + kernel.shutdown(); + } + + @Test + void GIVEN_user_component_config_with_periodic_interval_and_buffer_inverval_time_THEN_config_update_by_buffer() throws Exception { + // Given randomly generated logs + tempDirectoryPath = Files.createTempDirectory(tempRootDir, "IntegrationTestsTemporaryLogFiles"); + for (int i = 0; i < 5; i++) { + createTempFileAndWriteData(tempDirectoryPath, "integTestRandomLogFiles.log_", Coerce.toString(i)); + } + setupKernel(tempDirectoryPath, "smallPeriodicIntervalUserComponentConfig.yaml"); + + // When + String componentName = "UserComponentA"; + + // Track how many times CloudWatch upload is called + AtomicInteger uploadCount = new AtomicInteger(0); + CountDownLatch uploadLatch = new CountDownLatch(2); + CountDownLatch firstUploadLatch = new CountDownLatch(1); + + // Buffer flush countdown mechanism for each component map + Map bufferFlushCountdown = new HashMap<>(); + bufferFlushCountdown.put(componentName, new AtomicInteger(2)); // Expect 2 buffer flushes + CountDownLatch bufferFlushLatch = new CountDownLatch(2); + + + when(cloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenAnswer(invocation -> { + uploadCount.incrementAndGet(); + firstUploadLatch.countDown(); + uploadLatch.countDown(); + return PutLogEventsResponse.builder().nextSequenceToken("nextToken").build(); + }); + + // Subscribe to buffer flush events for countdown tracking and by this time, buffer shouldn't flush + logManagerService.getRuntimeConfig() + .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName) + .subscribe((why, node) -> { + if (node != null && bufferFlushCountdown.containsKey(componentName)) { + int remaining = bufferFlushCountdown.get(componentName).decrementAndGet(); + bufferFlushLatch.countDown(); + System.out.println("Buffer flush countdown for " + componentName + ": " + remaining + " remaining"); + } + }); + + // Check initial state of runtime config. should be 0 since no flush has been performed by buffer + Topics initialComponentTopics = logManagerService.getRuntimeConfig() + .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName); + assertEquals(0, initialComponentTopics.size()); + + // Wait for first upload to ensure log manager is processing + assertTrue(firstUploadLatch.await(15, TimeUnit.SECONDS), + "Expected first CloudWatch upload within 15 seconds"); + + // Then write more logs and wait for the log manager to upload log + for (int i = 5; i < 10; i++) { + createTempFileAndWriteData(tempDirectoryPath, "integTestRandomLogFiles.log_", Coerce.toString(i)); + } + assertTrue(uploadLatch.await(15, TimeUnit.SECONDS), + "Expected 2 CloudWatch uploads within 15 seconds"); + + // Wait for buffer flushes + assertTrue(bufferFlushLatch.await(10, TimeUnit.SECONDS), + "Expected buffer flush countdown to complete within 10 seconds"); + + // Verify countdown reached zero, this includes the flush action for both maps + assertEquals(0, bufferFlushCountdown.get(componentName).get(), + "Buffer flush countdown should reach zero"); + + // Verify processing files information is correctly persisted + ProcessingFiles processingFiles = logManagerService.processingFilesInformation.get(componentName); + assertNotNull(processingFiles); + assertEquals(processingFiles.size(), 1); + + // Verify buffer has flushed exactly once by checking runtime config + Topics afterFlushComponentTopics = logManagerService.getRuntimeConfig() + .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName); + assertEquals(afterFlushComponentTopics.size(), 1); + + // Verify last uploaded timestamp is also persisted + Topics lastFileProcessedTopics = logManagerService.getRuntimeConfig() + .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName); + assertEquals(lastFileProcessedTopics.size(), 1); + } + + @Test + void testLogManagerShutdownFlushesBuffer() throws Exception { + tempDirectoryPath = Files.createTempDirectory(tempRootDir, "IntegrationTestsTemporaryLogFiles"); + + // Create initial test files + for (int i = 0; i < 5; i++) { + createTempFileAndWriteData(tempDirectoryPath, "integTestRandomLogFiles.log_", Coerce.toString(i)); + } + + setupKernel(tempDirectoryPath, "smallPeriodicIntervalUserComponentConfig.yaml"); + + String componentName = "UserComponentA"; + + // STEP 1: Track buffer flushes - we will verify a scheduled flush happens first + AtomicInteger bufferFlushCount = new AtomicInteger(0); + AtomicReference lastScheduledFlushTime = new AtomicReference<>(0L); + CountDownLatch scheduledFlushLatch = new CountDownLatch(1); + + // Track CloudWatch uploads + AtomicInteger uploadCount = new AtomicInteger(0); + CountDownLatch firstUploadLatch = new CountDownLatch(1); + CountDownLatch secondUploadLatch = new CountDownLatch(2); + + when(cloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenAnswer(invocation -> { + uploadCount.incrementAndGet(); + firstUploadLatch.countDown(); + secondUploadLatch.countDown(); + return PutLogEventsResponse.builder().nextSequenceToken("nextToken").build(); + }); + + // Subscribe to runtime config changes to detect buffer flushes + logManagerService.getRuntimeConfig() + .lookupTopics(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP, componentName) + .subscribe((why, node) -> { + if (node != null && node.childOf(PERSISTED_COMPONENT_LAST_FILE_PROCESSED_TIMESTAMP)) { + int flushNum = bufferFlushCount.incrementAndGet(); + long flushTime = System.currentTimeMillis(); + + // Record the timestamp of the first flush (scheduled flush) + if (flushNum == 1) { + lastScheduledFlushTime.set(flushTime); + scheduledFlushLatch.countDown(); + System.out.println("First scheduled buffer flush detected at: " + flushTime); + } else { + System.out.println("Additional buffer flush detected at: " + flushTime); + } + } + }); + + // Wait for initial data to be processed and first scheduled flush to happen + assertTrue(firstUploadLatch.await(10, TimeUnit.SECONDS), + "Expected first CloudWatch upload within 10 seconds"); + assertTrue(scheduledFlushLatch.await(20, TimeUnit.SECONDS), + "Expected the first scheduled buffer flush within 20 seconds"); + + // STEP 2: Verify the first scheduled flush happened + assertEquals(1, bufferFlushCount.get(), + "Should have exactly one buffer flush at this point (the scheduled flush)"); + + // Now add more files to ensure buffer has new data that hasn't been flushed yet + for (int i = 5; i < 10; i++) { + createTempFileAndWriteData(tempDirectoryPath, "integTestRandomLogFiles.log_", Coerce.toString(i)); + } + + // Wait briefly for log manager to process the new files + // but not long enough for another scheduled flush to happen + assertTrue(secondUploadLatch.await(10, TimeUnit.SECONDS), + "Expected second CloudWatch upload within 10 seconds"); + // Verify we have pending data in buffer + ProcessingFiles processingFiles = logManagerService.processingFilesInformation.get(componentName); + assertNotNull(processingFiles, "Processing files should exist for the component"); + // Buffer should contain data waiting to be flushed + assertThat(processingFiles.size(), greaterThan(0)); + + // Record the number of flushes before shutdown + int flushesBeforeShutdown = bufferFlushCount.get(); + + // STEP 3: Shutdown LogManagerService before next scheduled flush + // This should trigger a manual flush during shutdown + kernel.shutdown(); + + // Wait briefly for shutdown to complete + Thread.sleep(2000); + + // STEP 4: Verify that an additional flush happened during shutdown + // An additional flush should have occurred during shutdown + assertThat(bufferFlushCount.get(), greaterThan(flushesBeforeShutdown)); + } + + @Test + void testBufferAccumulatesUpdatesCorrectly() throws Exception { + // Given + tempDirectoryPath = Files.createTempDirectory(tempRootDir, "IntegrationTestsTemporaryLogFiles"); + + // Create initial test files + for (int i = 0; i < 3; i++) { + createTempFileAndWriteData(tempDirectoryPath, "integTestRandomLogFiles.log_", ""); + } + + // Configure with short intervals for faster testing + Map additionalConfig = new HashMap<>(); + additionalConfig.put(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, 1); + additionalConfig.put(BUFFER_INTERVAL_SEC, 3); + + setupKernel(tempDirectoryPath, "smallPeriodicIntervalUserComponentConfig.yaml"); + + // When + String componentName = "UserComponentA"; + + // Track CloudWatch uploads and buffer flushes + AtomicInteger uploadCount = new AtomicInteger(0); + AtomicInteger bufferFlushCount = new AtomicInteger(0); + CountDownLatch firstBufferFlush = new CountDownLatch(1); + + when(cloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenAnswer(invocation -> { + uploadCount.incrementAndGet(); + return PutLogEventsResponse.builder().nextSequenceToken("nextToken").build(); + }); + + // Subscribe to detect buffer flushes + logManagerService.getRuntimeConfig() + .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, componentName) + .subscribe((why, node) -> { + if (node != null && node.childOf(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2)) { + Topics currentTopics = logManagerService.getRuntimeConfig() + .lookupTopics(PERSISTED_COMPONENT_CURRENT_PROCESSING_FILE_INFORMATION_V2, componentName); + if (currentTopics.size() > 0) { + bufferFlushCount.incrementAndGet(); + firstBufferFlush.countDown(); + } + } + }); + + // Wait for first buffer flush + assertTrue(firstBufferFlush.await(10, TimeUnit.SECONDS), + "Expected first buffer flush within 10 seconds"); + + // Then + // Verify that multiple uploads happened before buffer flush + assertThat(uploadCount.get(), greaterThanOrEqualTo(2)); + + // Verify buffer flush happened + assertThat(bufferFlushCount.get(), greaterThanOrEqualTo(1)); + + // Verify CloudWatch uploads happened more frequently than buffer flushes + assertTrue(uploadCount.get() > bufferFlushCount.get(), + "Upload count should be greater than buffer flush count"); + } +} diff --git a/src/integrationtests/resources/com/aws/greengrass/integrationtests/logmanager/smallPeriodicIntervalUserComponentConfig.yaml b/src/integrationtests/resources/com/aws/greengrass/integrationtests/logmanager/smallPeriodicIntervalUserComponentConfig.yaml index 2ae927b2..e568fd5e 100644 --- a/src/integrationtests/resources/com/aws/greengrass/integrationtests/logmanager/smallPeriodicIntervalUserComponentConfig.yaml +++ b/src/integrationtests/resources/com/aws/greengrass/integrationtests/logmanager/smallPeriodicIntervalUserComponentConfig.yaml @@ -3,6 +3,7 @@ services: aws.greengrass.LogManager: configuration: periodicUploadIntervalSec: 10 + bufferIntervalSec: 20 logsUploaderConfiguration: componentLogsConfiguration: - componentName: 'UserComponentA' diff --git a/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java b/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java index 328a6c0b..c4e3ab1d 100644 --- a/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java +++ b/src/main/java/com/aws/greengrass/logmanager/util/PeriodicBuffer.java @@ -9,6 +9,7 @@ import com.aws.greengrass.logging.impl.LogManager; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -40,6 +41,7 @@ public class PeriodicBuffer { private final AtomicLong flushSuccessCount = new AtomicLong(0); private final AtomicLong flushFailureCount = new AtomicLong(0); private final AtomicInteger flushInProgress = new AtomicInteger(0); + private volatile CompletableFuture currentFlushFuture = CompletableFuture.completedFuture(null); private ScheduledFuture flushTask; private volatile boolean isShutdown = false; @@ -120,7 +122,7 @@ public void remove(K key) { } /** - * Manually triggers a flush of the buffer. + * Triggers a flush of the buffer. */ public void flush() { flush(false); @@ -133,99 +135,109 @@ public void flush() { */ @SuppressWarnings("PMD.AvoidCatchingGenericException") private void flush(boolean waitForInProgress) { - // Early check for empty buffer to avoid unnecessary locking - synchronized (bufferLock) { - if (buffer.isEmpty()) { - logger.atDebug().log("Buffer '{}' is empty, nothing to flush", bufferName); + // Don't attempt to flush after shutdown (unless waitForInProgress is true for shutdown flush) + if (isShutdown && !waitForInProgress) { + return; + } + + Map bufferToFlush = swapBuffer(); + + // if buffer is empty, skip the flush + if (bufferToFlush.isEmpty()) { + try { + logger.atDebug().log("Flush skipped for empty buffer '{}'", bufferName); + } finally { + flushInProgress.set(0); + currentFlushFuture = CompletableFuture.completedFuture(null); + } + return; + } + + if (waitForInProgress) { + // During shutdown, ensure any previous flush is finished + currentFlushFuture.join(); + } else { + // If a flush is already in progress, skip this one + if (!flushInProgress.compareAndSet(0, 1)) { return; } } - // Check if another flush is already in progress - if (!flushInProgress.compareAndSet(0, 1)) { - if (waitForInProgress) { - logger.atDebug().log("Flush already in progress for buffer '{}', waiting for completion", bufferName); - // Wait for the current flush to complete - while (flushInProgress.get() != 0) { + + if (waitForInProgress) { + // run performFlush directly on the same thread for manual triggered flush + try { + performFlush(bufferToFlush); + } finally { + flushInProgress.set(0); + } + currentFlushFuture = CompletableFuture.completedFuture(null); + } else { + // otherwise, for lambda triggered flush, we run the performFlush on a background thread + // assign the returned future to currentFlushFuture so that we can optionally wait for it in the next flush + try { + currentFlushFuture = CompletableFuture.runAsync(() -> { try { - Thread.sleep(10); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.atWarn().log("Interrupted while waiting for flush to complete in buffer '{}'", - bufferName); - return; + performFlush(bufferToFlush); + } finally { + flushInProgress.set(0); } + }, scheduler); + } catch (RejectedExecutionException e) { + // catching the scheduler exception, backup plan in case the async flush failed to schedule, force flush here + try { + performFlush(bufferToFlush); + } finally { + flushInProgress.set(0); } - // Try again after the previous flush completed - flush(waitForInProgress); - } else { - logger.atDebug().log("Flush already in progress for buffer '{}', skipping", bufferName); + currentFlushFuture = CompletableFuture.completedFuture(null); } - return; } + } - try { - Map bufferToFlush; - boolean shouldFlush; - synchronized (bufferLock) { - // Double-check if buffer is still non-empty - if (buffer.isEmpty()) { - logger.atDebug().log("Buffer '{}' became empty, nothing to flush", bufferName); - // Don't return here - we need to go through finally block - bufferToFlush = new ConcurrentHashMap<>(); - shouldFlush = false; - } else { - logger.atInfo().log("Flushing buffer '{}' with {} items", bufferName, buffer.size()); - - // Swap the buffer instead of copying for better performance - bufferToFlush = buffer; - buffer = new ConcurrentHashMap<>(); - shouldFlush = true; - } + private Map swapBuffer() { + synchronized (bufferLock) { + if (buffer.isEmpty()) { + return new ConcurrentHashMap<>(); } + Map bufferToFlush = buffer; + buffer = new ConcurrentHashMap<>(); + return bufferToFlush; + } + } - // Only proceed with flush if we have data - if (shouldFlush) { - long startTime = System.currentTimeMillis(); - try { - flushHandler.accept(bufferToFlush); - - long duration = System.currentTimeMillis() - startTime; - flushSuccessCount.incrementAndGet(); + @SuppressWarnings("PMD.AvoidCatchingGenericException") + private void performFlush(Map bufferToFlush) { + logger.atInfo().log("Flushing buffer '{}' with {} items", bufferName, bufferToFlush.size()); + long startTime = System.currentTimeMillis(); + + try { + flushHandler.accept(bufferToFlush); + + long duration = System.currentTimeMillis() - startTime; + flushSuccessCount.incrementAndGet(); + + logger.atDebug() + .kv("itemCount", bufferToFlush.size()) + .kv("durationMs", duration) + .kv("successCount", flushSuccessCount.get()) + .log("Successfully flushed buffer '{}'", bufferName); - logger.atDebug() - .kv("itemCount", bufferToFlush.size()) - .kv("durationMs", duration) - .kv("successCount", flushSuccessCount.get()) - .log("Successfully flushed buffer '{}'", bufferName); - - } catch (RejectedExecutionException e) { - // Put the items back if flush was rejected - synchronized (bufferLock) { - bufferToFlush.forEach(buffer::putIfAbsent); - } - flushFailureCount.incrementAndGet(); - logger.atError().cause(e) - .kv("failureCount", flushFailureCount.get()) - .log("Flush rejected for buffer '{}', items restored", bufferName); - } catch (RuntimeException e) { - // Catching RuntimeException because the flushHandler is a Consumer> that - // can throw any unchecked exception. We need to handle all possible runtime exceptions - // to ensure the buffer continues to function properly. - // CHECKSTYLE:OFF - We need to catch all runtime exceptions from user-provided handler - // PMD:OFF:AvoidCatchingGenericException - The flushHandler is a user-provided Consumer - // that can throw any unchecked exception, and we must ensure the buffer continues to - // function even if the handler fails. This is a valid use case for catching RuntimeException. - flushFailureCount.incrementAndGet(); - logger.atError().cause(e) - .kv("failureCount", flushFailureCount.get()) - .log("Failed to flush buffer '{}'. Items lost.", bufferName); - // CHECKSTYLE:ON - // PMD:ON:AvoidCatchingGenericException - } + } catch (RejectedExecutionException e) { + synchronized (bufferLock) { + // Restore items to buffer if flush failed + logger.atError().cause(e).log("Flush rejected for buffer '{}', items restored", bufferName); + bufferToFlush.forEach(buffer::putIfAbsent); } - } finally { - flushInProgress.set(0); + flushFailureCount.incrementAndGet(); + logger.atError().cause(e) + .kv("failureCount", flushFailureCount.get()) + .log("Flush rejected for buffer '{}', items restored", bufferName); + } catch (RuntimeException e) { + flushFailureCount.incrementAndGet(); + logger.atError().cause(e) + .kv("failureCount", flushFailureCount.get()) + .log("Flush failed for buffer '{}'", bufferName); } } @@ -263,24 +275,28 @@ public void shutdown() { if (flushTask != null) { flushTask.cancel(false); } - - // Flush any remaining items, waiting for any in-progress flush to complete + + // Then flush any remaining items try { flush(true); + currentFlushFuture.join(); // Wait for final flush to complete } catch (RuntimeException e) { - // Catching RuntimeException because the flush method can throw any unchecked exception - // from the flushHandler. We need to ensure shutdown completes even if the final flush fails. - // CHECKSTYLE:OFF - We need to catch all runtime exceptions from user-provided handler - // PMD:OFF:AvoidCatchingGenericException - The flushHandler called via flush(true) can throw - // any unchecked exception. We must ensure shutdown completes successfully even if the final - // flush operation fails. This is a valid use case for catching RuntimeException. - logger.atError().cause(e).log("Error during final flush"); - // CHECKSTYLE:ON - // PMD:ON:AvoidCatchingGenericException + logger.atWarn().cause(e).log("Exception during shutdown flush for buffer '{}'", bufferName); } // Shutdown the scheduler - scheduler.shutdownNow(); + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + scheduler.shutdownNow(); + + // the final flush operation fails. This is a valid use case for catching RuntimeException. + logger.atError().cause(e).log("Error during final flush"); + } logger.atInfo() .kv("successfulFlushes", flushSuccessCount.get()) diff --git a/src/test/java/com/aws/greengrass/logmanager/util/PeriodicBufferTest.java b/src/test/java/com/aws/greengrass/logmanager/util/PeriodicBufferTest.java index 9836e5d2..4c8c7a6e 100644 --- a/src/test/java/com/aws/greengrass/logmanager/util/PeriodicBufferTest.java +++ b/src/test/java/com/aws/greengrass/logmanager/util/PeriodicBufferTest.java @@ -339,13 +339,20 @@ void testConcurrentPutAndRemove() throws InterruptedException { @Test void testFlushDuringConcurrentModification() throws InterruptedException { AtomicReference flushException = new AtomicReference<>(); + CountDownLatch flushProcessingStarted = new CountDownLatch(1); + CountDownLatch continueFlushProcessing = new CountDownLatch(1); + AtomicReference> capturedFlushData = new AtomicReference<>(); + Consumer> safeFlushHandler = data -> { try { + capturedFlushData.set(new ConcurrentHashMap<>(data)); + flushProcessingStarted.countDown(); + // Wait for modification thread to start + assertTrue(continueFlushProcessing.await(2, TimeUnit.SECONDS)); // Simulate processing time Thread.sleep(50); defaultFlushHandler.accept(data); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (Exception e) { flushException.set(e); } }; @@ -358,18 +365,18 @@ void testFlushDuringConcurrentModification() throws InterruptedException { buffer.put("key" + i, "value" + i); } - CountDownLatch flushStarted = new CountDownLatch(1); CountDownLatch modificationComplete = new CountDownLatch(1); - Thread flushThread = new Thread(() -> { - flushStarted.countDown(); - buffer.flush(); - }); + // Start flush which will swap the buffer immediately + Thread flushThread = new Thread(() -> buffer.flush()); + flushThread.start(); + + // Wait for flush processing to start (buffer has been swapped) + assertTrue(flushProcessingStarted.await(2, TimeUnit.SECONDS)); + // Now modify the buffer - these changes should go into the new buffer Thread modificationThread = new Thread(() -> { try { - flushStarted.await(); - // Try to modify buffer during flush for (int i = 100; i < 200; i++) { buffer.put("key" + i, "value" + i); if (i % 10 == 0) { @@ -377,28 +384,42 @@ void testFlushDuringConcurrentModification() throws InterruptedException { } } modificationComplete.countDown(); - } catch (InterruptedException e) { + } catch (Exception e) { Thread.currentThread().interrupt(); } }); - - flushThread.start(); modificationThread.start(); + // Allow flush processing to continue + continueFlushProcessing.countDown(); + + // Wait for both threads to complete flushThread.join(5000); assertTrue(modificationComplete.await(5, TimeUnit.SECONDS)); + // Give time for async flush to complete + Thread.sleep(200); + // Verify no exceptions occurred if (flushException.get() != null) { fail("Exception during flush: " + flushException.get()); } - // First flush should have the original 100 items + // First flush should have exactly the original 100 items (buffer was swapped) assertEquals(1, flushedData.size()); assertEquals(100, flushedData.get(0).size()); + + // Verify the flushed data contains the original items + Map flushedMap = capturedFlushData.get(); + assertNotNull(flushedMap); + for (int i = 0; i < 100; i++) { + assertEquals("value" + i, flushedMap.get("key" + i)); + } // Buffer should contain new items added during flush + // The new buffer should have items 100-199 minus some removed items assertTrue(buffer.size() > 0); + assertTrue(buffer.size() < 100); // Should be less than 100 due to removals } @Test @@ -434,48 +455,41 @@ void testFlushStatistics() throws InterruptedException { } @Test - void testConcurrentFlushSkipping() throws InterruptedException { - CountDownLatch flushInProgress = new CountDownLatch(1); - CountDownLatch secondFlushAttempted = new CountDownLatch(1); - AtomicInteger actualFlushCount = new AtomicInteger(0); - - Consumer> slowFlushHandler = data -> { + void testConcurrentFlushSkipping() throws Exception { + CountDownLatch firstFlushStarted = new CountDownLatch(1); + CountDownLatch allowFirstFlushToProceed = new CountDownLatch(1); + AtomicInteger flushCallCount = new AtomicInteger(0); + + PeriodicBuffer buffer = new PeriodicBuffer<>("testBuffer", 10, items -> { + flushCallCount.incrementAndGet(); + firstFlushStarted.countDown(); // Let the test know flush has started try { - actualFlushCount.incrementAndGet(); - flushInProgress.countDown(); - Thread.sleep(500); // Simulate slow flush - defaultFlushHandler.accept(data); + allowFirstFlushToProceed.await(); // Wait here to simulate long-running flush } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - }; + }); - buffer = new PeriodicBuffer<>("TestBuffer", 60, slowFlushHandler); - buffer.start(); + buffer.put("k", "v"); - buffer.put("key1", "value1"); + // Thread 1 will start the flush and block + Thread flushThread1 = new Thread(buffer::flush); + flushThread1.start(); - // Start first flush in separate thread - Thread firstFlush = new Thread(() -> buffer.flush()); - firstFlush.start(); + // Wait until flushThread1 gets into the flush method + assertTrue(firstFlushStarted.await(3, TimeUnit.SECONDS), "First flush didn't start in time"); - // Wait for first flush to start - assertTrue(flushInProgress.await(1, TimeUnit.SECONDS)); + // Thread 2 will attempt to flush but should skip since flushInProgress = 1 + Thread flushThread2 = new Thread(buffer::flush); + flushThread2.start(); - // Try second flush while first is in progress - Thread secondFlush = new Thread(() -> { - buffer.flush(); - secondFlushAttempted.countDown(); - }); - secondFlush.start(); + // Allow flush to complete + allowFirstFlushToProceed.countDown(); - assertTrue(secondFlushAttempted.await(1, TimeUnit.SECONDS)); - firstFlush.join(2000); - secondFlush.join(2000); + flushThread1.join(); + flushThread2.join(); - // Only one actual flush should have occurred - assertEquals(1, actualFlushCount.get()); - assertEquals(1, flushCount.get()); + assertEquals(1, flushCallCount.get(), "Only one flush should have been performed"); } @Test