From 3172ade626e80eb2dfd4a6c4a773df90c2a030f4 Mon Sep 17 00:00:00 2001 From: Harshdeep Singh <6162866+harsh62@users.noreply.github.com> Date: Mon, 24 Mar 2025 16:13:45 -0400 Subject: [PATCH 1/3] fix(logging): crash in release builds --- .../Consumer/CloudWatchLoggingConsumer.swift | 63 ++++++++++++++----- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift index 3ff0d65068..914c656a26 100644 --- a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift +++ b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift @@ -72,28 +72,57 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { return } - defer { - ensureLogStreamExistsComplete = true - } - + // Only mark as complete after everything has finished successfully + // to avoid potential race conditions with incomplete state + if logStreamName == nil { - self.logStreamName = await formatter.formattedStreamName() + do { + // Explicitly capture self to avoid potential memory issues + let streamName = await self.formatter.formattedStreamName() + // Check if self is still valid and streamName is not nil before assigning + if !streamName.isEmpty { + self.logStreamName = streamName + } else { + // Fallback to a default if the stream name couldn't be determined + self.logStreamName = "default.\(UUID().uuidString)" + } + } catch { + // Handle any potential errors from async call + Amplify.Logging.error("Failed to get formatted stream name: \(error)") + // Fallback to a default + self.logStreamName = "default.\(UUID().uuidString)" + } } - let stream = try? await self.client.describeLogStreams(input: DescribeLogStreamsInput( - logGroupName: self.logGroupName, - logStreamNamePrefix: self.logStreamName - )).logStreams?.first(where: { stream in - return stream.logStreamName == self.logStreamName - }) - if stream != nil { + // Safety check - ensure we have a valid stream name before proceeding + guard let logStreamName = self.logStreamName, !logStreamName.isEmpty else { + Amplify.Logging.error("Invalid log stream name") + ensureLogStreamExistsComplete = true return } - - _ = try? await self.client.createLogStream(input: CreateLogStreamInput( - logGroupName: self.logGroupName, - logStreamName: self.logStreamName - )) + + do { + let stream = try? await self.client.describeLogStreams(input: DescribeLogStreamsInput( + logGroupName: self.logGroupName, + logStreamNamePrefix: logStreamName + )).logStreams?.first(where: { stream in + return stream.logStreamName == logStreamName + }) + + if stream == nil { + _ = try? await self.client.createLogStream(input: CreateLogStreamInput( + logGroupName: self.logGroupName, + logStreamName: logStreamName + )) + } + + // Mark as complete only after all operations finished + ensureLogStreamExistsComplete = true + } catch { + Amplify.Logging.error("Error ensuring log stream exists: \(error)") + // Still mark as complete to avoid getting stuck in a failed state + ensureLogStreamExistsComplete = true + } } private func sendLogEvents(_ entries: [LogEntry]) async throws { From 8c5d5df4eb47042169ef7ff3d106bf3d812c4932 Mon Sep 17 00:00:00 2001 From: Harshdeep Singh <6162866+harsh62@users.noreply.github.com> Date: Fri, 25 Apr 2025 00:03:11 -0400 Subject: [PATCH 2/3] push more fixes --- ...WSCloudWatchLoggingSessionController.swift | 18 ++- .../Consumer/CloudWatchLoggingConsumer.swift | 129 ++++++++++++++---- 2 files changed, 116 insertions(+), 31 deletions(-) diff --git a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift index 85f2d86ed6..a35054fe65 100644 --- a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift +++ b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift @@ -132,14 +132,19 @@ final class AWSCloudWatchLoggingSessionController { } self.batchSubscription = producer.logBatchPublisher.sink { [weak self] batch in guard self?.networkMonitor.isOnline == true else { return } + + // Capture strong references to consumer and batch before the async task + let strongConsumer = consumer + let strongBatch = batch + Task { do { - try await consumer.consume(batch: batch) + try await strongConsumer.consume(batch: strongBatch) } catch { Amplify.Logging.default.error("Error flushing logs with error \(error.localizedDescription)") let payload = HubPayload(eventName: HubPayload.EventName.Logging.flushLogFailure, context: error.localizedDescription) Amplify.Hub.dispatch(to: HubChannel.logging, payload: payload) - try batch.complete() + try strongBatch.complete() } } } @@ -178,8 +183,15 @@ final class AWSCloudWatchLoggingSessionController { } private func consumeLogBatch(_ batch: LogBatch) async throws { + // Check if consumer exists before trying to use it + guard let consumer = self.consumer else { + // If consumer is nil, still mark the batch as completed to prevent memory leaks + try batch.complete() + return + } + do { - try await consumer?.consume(batch: batch) + try await consumer.consume(batch: batch) } catch { Amplify.Logging.default.error("Error flushing logs with error \(error.localizedDescription)") let payload = HubPayload(eventName: HubPayload.EventName.Logging.flushLogFailure, context: error.localizedDescription) diff --git a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift index 914c656a26..13c73d7e17 100644 --- a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift +++ b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift @@ -40,25 +40,50 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { } await ensureLogStreamExists() - let batchByteSize = try encoder.encode(entries).count + // Add safety check for nil logStreamName + guard let _ = self.logStreamName else { + Amplify.Logging.error("Log stream name is nil, cannot send logs") + try batch.complete() + return + } + + // Wrap encoding in do-catch to prevent crashes + var batchByteSize: Int + do { + batchByteSize = try encoder.encode(entries).count + } catch { + Amplify.Logging.error("Failed to encode log entries: \(error)") + try batch.complete() + return + } + if entries.count > AWSCloudWatchConstants.maxLogEvents { let smallerEntries = entries.chunked(into: AWSCloudWatchConstants.maxLogEvents) for entries in smallerEntries { - let entrySize = try encoder.encode(entries).count - if entrySize > AWSCloudWatchConstants.maxBatchByteSize { - let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) - for chunk in chunks { - try await sendLogEvents(chunk) + // Wrap in do-catch to prevent crashes + do { + let entrySize = try encoder.encode(entries).count + if entrySize > AWSCloudWatchConstants.maxBatchByteSize { + let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) + for chunk in chunks { + try await sendLogEvents(chunk) + } + } else { + try await sendLogEvents(entries) } - } else { - try await sendLogEvents(entries) + } catch { + Amplify.Logging.error("Error processing log batch: \(error)") + continue } } - } else if batchByteSize > AWSCloudWatchConstants.maxBatchByteSize { - let smallerEntries = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) - for entries in smallerEntries { - try await sendLogEvents(entries) + do { + let smallerEntries = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) + for entries in smallerEntries { + try await sendLogEvents(entries) + } + } catch { + Amplify.Logging.error("Error chunking log entries: \(error)") } } else { try await sendLogEvents(entries) @@ -126,22 +151,49 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { } private func sendLogEvents(_ entries: [LogEntry]) async throws { + // Safety check for empty entries + if entries.isEmpty { + return + } + + // Safety check for logStreamName + guard let logStreamName = self.logStreamName, !logStreamName.isEmpty else { + Amplify.Logging.error("Cannot send log events: Log stream name is nil or empty") + return + } + let events = convertToCloudWatchInputLogEvents(for: entries) - let response = try await self.client.putLogEvents(input: PutLogEventsInput( - logEvents: events, - logGroupName: self.logGroupName, - logStreamName: self.logStreamName, - sequenceToken: nil - )) - let retriableEntries = retriable(entries: entries, in: response) - if !retriableEntries.isEmpty { - let retriableEvents = convertToCloudWatchInputLogEvents(for: retriableEntries) - _ = try await self.client.putLogEvents(input: PutLogEventsInput( - logEvents: retriableEvents, + + // Safety check for empty events + if events.isEmpty { + Amplify.Logging.warn("No valid events to send to CloudWatch") + return + } + + do { + let response = try await self.client.putLogEvents(input: PutLogEventsInput( + logEvents: events, logGroupName: self.logGroupName, - logStreamName: self.logStreamName, + logStreamName: logStreamName, sequenceToken: nil )) + + // Handle retriable entries + let retriableEntries = retriable(entries: entries, in: response) + if !retriableEntries.isEmpty { + let retriableEvents = convertToCloudWatchInputLogEvents(for: retriableEntries) + if !retriableEvents.isEmpty { + _ = try await self.client.putLogEvents(input: PutLogEventsInput( + logEvents: retriableEvents, + logGroupName: self.logGroupName, + logStreamName: logStreamName, + sequenceToken: nil + )) + } + } + } catch { + Amplify.Logging.error("Failed to send log events: \(error)") + throw error } } @@ -176,19 +228,40 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { var chunks: [[LogEntry]] = [] var chunk: [LogEntry] = [] var currentChunkSize = 0 + for entry in entries { - let entrySize = try encoder.encode(entry).count + // Wrap the encoding in a do-catch to handle potential errors + var entrySize: Int + do { + entrySize = try encoder.encode(entry).count + } catch { + Amplify.Logging.error("Failed to encode log entry: \(error)") + // Skip this entry and continue with the next one + continue + } + if currentChunkSize + entrySize < maxByteSize { chunk.append(entry) currentChunkSize = currentChunkSize + entrySize } else { - chunks.append(chunk) + // Only add non-empty chunks + if !chunk.isEmpty { + chunks.append(chunk) + } chunk = [entry] - currentChunkSize = currentChunkSize + entrySize + currentChunkSize = entrySize } } - + + // Add the last chunk if it's not empty + if !chunk.isEmpty { + chunks.append(chunk) + } + + // Return even if chunks is empty to avoid null pointer issues return chunks } // swiftlint:enable shorthand_operator } + + From 147dba55b952b9987d03c88bc053c4a0eae73013 Mon Sep 17 00:00:00 2001 From: Harshdeep Singh <6162866+harsh62@users.noreply.github.com> Date: Wed, 30 Apr 2025 23:17:54 -0400 Subject: [PATCH 3/3] adding a locking mechanism to avoid race condition crashes --- .../Consumer/CloudWatchLoggingConsumer.swift | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift index 13c73d7e17..bdcc17c458 100644 --- a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift +++ b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift @@ -18,7 +18,12 @@ class CloudWatchLoggingConsumer { private let logGroupName: String private var logStreamName: String? private var ensureLogStreamExistsComplete: Bool = false - private let encoder = JSONEncoder() + private let encoderLock = NSLock() + private let encoder: JSONEncoder = { + let encoder = JSONEncoder() + encoder.dateEncodingStrategy = .millisecondsSince1970 + return encoder + }() init( client: CloudWatchLogsClientProtocol, @@ -29,6 +34,12 @@ class CloudWatchLoggingConsumer { self.formatter = CloudWatchLoggingStreamNameFormatter(userIdentifier: userIdentifier) self.logGroupName = logGroupName } + + private func safeEncode(_ value: T) throws -> Data { + return try encoderLock.withLock { + return try encoder.encode(value) + } + } } extension CloudWatchLoggingConsumer: LogBatchConsumer { @@ -47,22 +58,23 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { return } - // Wrap encoding in do-catch to prevent crashes + // Create a strong reference to entries to prevent deallocation during encoding + let entriesCopy = entries + var batchByteSize: Int do { - batchByteSize = try encoder.encode(entries).count + batchByteSize = try safeEncode(entriesCopy).count } catch { Amplify.Logging.error("Failed to encode log entries: \(error)") try batch.complete() return } - if entries.count > AWSCloudWatchConstants.maxLogEvents { - let smallerEntries = entries.chunked(into: AWSCloudWatchConstants.maxLogEvents) + if entriesCopy.count > AWSCloudWatchConstants.maxLogEvents { + let smallerEntries = entriesCopy.chunked(into: AWSCloudWatchConstants.maxLogEvents) for entries in smallerEntries { - // Wrap in do-catch to prevent crashes do { - let entrySize = try encoder.encode(entries).count + let entrySize = try safeEncode(entries).count if entrySize > AWSCloudWatchConstants.maxBatchByteSize { let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) for chunk in chunks { @@ -78,7 +90,7 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { } } else if batchByteSize > AWSCloudWatchConstants.maxBatchByteSize { do { - let smallerEntries = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) + let smallerEntries = try chunk(entriesCopy, into: AWSCloudWatchConstants.maxBatchByteSize) for entries in smallerEntries { try await sendLogEvents(entries) } @@ -86,7 +98,7 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { Amplify.Logging.error("Error chunking log entries: \(error)") } } else { - try await sendLogEvents(entries) + try await sendLogEvents(entriesCopy) } try batch.complete() @@ -264,4 +276,12 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { // swiftlint:enable shorthand_operator } +private extension NSLock { + func withLock(_ block: () throws -> T) throws -> T { + lock() + defer { unlock() } + return try block() + } +} +