@@ -18,7 +18,12 @@ class CloudWatchLoggingConsumer {
18
18
private let logGroupName : String
19
19
private var logStreamName : String ?
20
20
private var ensureLogStreamExistsComplete : Bool = false
21
- private let encoder = JSONEncoder ( )
21
+ private let encoderLock = NSLock ( )
22
+ private let encoder : JSONEncoder = {
23
+ let encoder = JSONEncoder ( )
24
+ encoder. dateEncodingStrategy = . millisecondsSince1970
25
+ return encoder
26
+ } ( )
22
27
23
28
init (
24
29
client: CloudWatchLogsClientProtocol ,
@@ -29,6 +34,12 @@ class CloudWatchLoggingConsumer {
29
34
self . formatter = CloudWatchLoggingStreamNameFormatter ( userIdentifier: userIdentifier)
30
35
self . logGroupName = logGroupName
31
36
}
37
+
38
+ private func safeEncode< T: Encodable > ( _ value: T ) throws -> Data {
39
+ return try encoderLock. withLock {
40
+ return try encoder. encode ( value)
41
+ }
42
+ }
32
43
}
33
44
34
45
extension CloudWatchLoggingConsumer : LogBatchConsumer {
@@ -40,28 +51,54 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
40
51
}
41
52
await ensureLogStreamExists ( )
42
53
43
- let batchByteSize = try encoder. encode ( entries) . count
44
- if entries. count > AWSCloudWatchConstants . maxLogEvents {
45
- let smallerEntries = entries. chunked ( into: AWSCloudWatchConstants . maxLogEvents)
54
+ // Add safety check for nil logStreamName
55
+ guard let _ = self . logStreamName else {
56
+ Amplify . Logging. error ( " Log stream name is nil, cannot send logs " )
57
+ try batch. complete ( )
58
+ return
59
+ }
60
+
61
+ // Create a strong reference to entries to prevent deallocation during encoding
62
+ let entriesCopy = entries
63
+
64
+ var batchByteSize : Int
65
+ do {
66
+ batchByteSize = try safeEncode ( entriesCopy) . count
67
+ } catch {
68
+ Amplify . Logging. error ( " Failed to encode log entries: \( error) " )
69
+ try batch. complete ( )
70
+ return
71
+ }
72
+
73
+ if entriesCopy. count > AWSCloudWatchConstants . maxLogEvents {
74
+ let smallerEntries = entriesCopy. chunked ( into: AWSCloudWatchConstants . maxLogEvents)
46
75
for entries in smallerEntries {
47
- let entrySize = try encoder. encode ( entries) . count
48
- if entrySize > AWSCloudWatchConstants . maxBatchByteSize {
49
- let chunks = try chunk ( entries, into: AWSCloudWatchConstants . maxBatchByteSize)
50
- for chunk in chunks {
51
- try await sendLogEvents ( chunk)
76
+ do {
77
+ let entrySize = try safeEncode ( entries) . count
78
+ if entrySize > AWSCloudWatchConstants . maxBatchByteSize {
79
+ let chunks = try chunk ( entries, into: AWSCloudWatchConstants . maxBatchByteSize)
80
+ for chunk in chunks {
81
+ try await sendLogEvents ( chunk)
82
+ }
83
+ } else {
84
+ try await sendLogEvents ( entries)
52
85
}
53
- } else {
54
- try await sendLogEvents ( entries)
86
+ } catch {
87
+ Amplify . Logging. error ( " Error processing log batch: \( error) " )
88
+ continue
55
89
}
56
90
}
57
-
58
91
} else if batchByteSize > AWSCloudWatchConstants . maxBatchByteSize {
59
- let smallerEntries = try chunk ( entries, into: AWSCloudWatchConstants . maxBatchByteSize)
60
- for entries in smallerEntries {
61
- try await sendLogEvents ( entries)
92
+ do {
93
+ let smallerEntries = try chunk ( entriesCopy, into: AWSCloudWatchConstants . maxBatchByteSize)
94
+ for entries in smallerEntries {
95
+ try await sendLogEvents ( entries)
96
+ }
97
+ } catch {
98
+ Amplify . Logging. error ( " Error chunking log entries: \( error) " )
62
99
}
63
100
} else {
64
- try await sendLogEvents ( entries )
101
+ try await sendLogEvents ( entriesCopy )
65
102
}
66
103
67
104
try batch. complete ( )
@@ -72,47 +109,103 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
72
109
return
73
110
}
74
111
75
- defer {
76
- ensureLogStreamExistsComplete = true
77
- }
78
-
112
+ // Only mark as complete after everything has finished successfully
113
+ // to avoid potential race conditions with incomplete state
114
+
79
115
if logStreamName == nil {
80
- self . logStreamName = await formatter. formattedStreamName ( )
116
+ do {
117
+ // Explicitly capture self to avoid potential memory issues
118
+ let streamName = await self . formatter. formattedStreamName ( )
119
+ // Check if self is still valid and streamName is not nil before assigning
120
+ if !streamName. isEmpty {
121
+ self . logStreamName = streamName
122
+ } else {
123
+ // Fallback to a default if the stream name couldn't be determined
124
+ self . logStreamName = " default. \( UUID ( ) . uuidString) "
125
+ }
126
+ } catch {
127
+ // Handle any potential errors from async call
128
+ Amplify . Logging. error ( " Failed to get formatted stream name: \( error) " )
129
+ // Fallback to a default
130
+ self . logStreamName = " default. \( UUID ( ) . uuidString) "
131
+ }
81
132
}
82
133
83
- let stream = try ? await self . client. describeLogStreams ( input: DescribeLogStreamsInput (
84
- logGroupName: self . logGroupName,
85
- logStreamNamePrefix: self . logStreamName
86
- ) ) . logStreams? . first ( where: { stream in
87
- return stream. logStreamName == self . logStreamName
88
- } )
89
- if stream != nil {
134
+ // Safety check - ensure we have a valid stream name before proceeding
135
+ guard let logStreamName = self . logStreamName, !logStreamName. isEmpty else {
136
+ Amplify . Logging. error ( " Invalid log stream name " )
137
+ ensureLogStreamExistsComplete = true
90
138
return
91
139
}
92
-
93
- _ = try ? await self . client. createLogStream ( input: CreateLogStreamInput (
94
- logGroupName: self . logGroupName,
95
- logStreamName: self . logStreamName
96
- ) )
140
+
141
+ do {
142
+ let stream = try ? await self . client. describeLogStreams ( input: DescribeLogStreamsInput (
143
+ logGroupName: self . logGroupName,
144
+ logStreamNamePrefix: logStreamName
145
+ ) ) . logStreams? . first ( where: { stream in
146
+ return stream. logStreamName == logStreamName
147
+ } )
148
+
149
+ if stream == nil {
150
+ _ = try ? await self . client. createLogStream ( input: CreateLogStreamInput (
151
+ logGroupName: self . logGroupName,
152
+ logStreamName: logStreamName
153
+ ) )
154
+ }
155
+
156
+ // Mark as complete only after all operations finished
157
+ ensureLogStreamExistsComplete = true
158
+ } catch {
159
+ Amplify . Logging. error ( " Error ensuring log stream exists: \( error) " )
160
+ // Still mark as complete to avoid getting stuck in a failed state
161
+ ensureLogStreamExistsComplete = true
162
+ }
97
163
}
98
164
99
165
private func sendLogEvents( _ entries: [ LogEntry ] ) async throws {
166
+ // Safety check for empty entries
167
+ if entries. isEmpty {
168
+ return
169
+ }
170
+
171
+ // Safety check for logStreamName
172
+ guard let logStreamName = self . logStreamName, !logStreamName. isEmpty else {
173
+ Amplify . Logging. error ( " Cannot send log events: Log stream name is nil or empty " )
174
+ return
175
+ }
176
+
100
177
let events = convertToCloudWatchInputLogEvents ( for: entries)
101
- let response = try await self . client. putLogEvents ( input: PutLogEventsInput (
102
- logEvents: events,
103
- logGroupName: self . logGroupName,
104
- logStreamName: self . logStreamName,
105
- sequenceToken: nil
106
- ) )
107
- let retriableEntries = retriable ( entries: entries, in: response)
108
- if !retriableEntries. isEmpty {
109
- let retriableEvents = convertToCloudWatchInputLogEvents ( for: retriableEntries)
110
- _ = try await self . client. putLogEvents ( input: PutLogEventsInput (
111
- logEvents: retriableEvents,
178
+
179
+ // Safety check for empty events
180
+ if events. isEmpty {
181
+ Amplify . Logging. warn ( " No valid events to send to CloudWatch " )
182
+ return
183
+ }
184
+
185
+ do {
186
+ let response = try await self . client. putLogEvents ( input: PutLogEventsInput (
187
+ logEvents: events,
112
188
logGroupName: self . logGroupName,
113
- logStreamName: self . logStreamName,
189
+ logStreamName: logStreamName,
114
190
sequenceToken: nil
115
191
) )
192
+
193
+ // Handle retriable entries
194
+ let retriableEntries = retriable ( entries: entries, in: response)
195
+ if !retriableEntries. isEmpty {
196
+ let retriableEvents = convertToCloudWatchInputLogEvents ( for: retriableEntries)
197
+ if !retriableEvents. isEmpty {
198
+ _ = try await self . client. putLogEvents ( input: PutLogEventsInput (
199
+ logEvents: retriableEvents,
200
+ logGroupName: self . logGroupName,
201
+ logStreamName: logStreamName,
202
+ sequenceToken: nil
203
+ ) )
204
+ }
205
+ }
206
+ } catch {
207
+ Amplify . Logging. error ( " Failed to send log events: \( error) " )
208
+ throw error
116
209
}
117
210
}
118
211
@@ -147,19 +240,48 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
147
240
var chunks : [ [ LogEntry ] ] = [ ]
148
241
var chunk : [ LogEntry ] = [ ]
149
242
var currentChunkSize = 0
243
+
150
244
for entry in entries {
151
- let entrySize = try encoder. encode ( entry) . count
245
+ // Wrap the encoding in a do-catch to handle potential errors
246
+ var entrySize : Int
247
+ do {
248
+ entrySize = try encoder. encode ( entry) . count
249
+ } catch {
250
+ Amplify . Logging. error ( " Failed to encode log entry: \( error) " )
251
+ // Skip this entry and continue with the next one
252
+ continue
253
+ }
254
+
152
255
if currentChunkSize + entrySize < maxByteSize {
153
256
chunk. append ( entry)
154
257
currentChunkSize = currentChunkSize + entrySize
155
258
} else {
156
- chunks. append ( chunk)
259
+ // Only add non-empty chunks
260
+ if !chunk. isEmpty {
261
+ chunks. append ( chunk)
262
+ }
157
263
chunk = [ entry]
158
- currentChunkSize = currentChunkSize + entrySize
264
+ currentChunkSize = entrySize
159
265
}
160
266
}
161
-
267
+
268
+ // Add the last chunk if it's not empty
269
+ if !chunk. isEmpty {
270
+ chunks. append ( chunk)
271
+ }
272
+
273
+ // Return even if chunks is empty to avoid null pointer issues
162
274
return chunks
163
275
}
164
276
// swiftlint:enable shorthand_operator
165
277
}
278
+
279
+ private extension NSLock {
280
+ func withLock< T> ( _ block: ( ) throws -> T ) throws -> T {
281
+ lock ( )
282
+ defer { unlock ( ) }
283
+ return try block ( )
284
+ }
285
+ }
286
+
287
+
0 commit comments