@@ -11,9 +11,32 @@ import (
11
11
logging "cloud.google.com/go/logging/apiv2"
12
12
"cloud.google.com/go/logging/apiv2/loggingpb"
13
13
"google.golang.org/genproto/googleapis/cloud/audit"
14
+ "google.golang.org/grpc/codes"
15
+ "google.golang.org/grpc/status"
14
16
)
15
17
16
18
func Tail (ctx context.Context , projectID , clusterName string , cb func (* audit.AuditLog ) error ) error {
19
+ for {
20
+ select {
21
+ case <- ctx .Done ():
22
+ return ctx .Err ()
23
+ default :
24
+ }
25
+
26
+ if err := tailLogs (ctx , projectID , clusterName , cb ); err != nil {
27
+ if grpcErr , ok := status .FromError (err ); ok && grpcErr .Code () == codes .OutOfRange {
28
+ // Expected error case:
29
+ // "rpc error: code = OutOfRange desc = Session has run for the maximum allowed duration of 1h. To
30
+ // continue, start a new session with the same request"
31
+ slog .Warn ("session expired, restarting" , slog .Any ("error" , err ))
32
+ continue
33
+ }
34
+ return fmt .Errorf ("log tailing failed: %w" , err )
35
+ }
36
+ }
37
+ }
38
+
39
+ func tailLogs (ctx context.Context , projectID , clusterName string , cb func (* audit.AuditLog ) error ) error {
17
40
client , err := logging .NewClient (ctx )
18
41
if err != nil {
19
42
return fmt .Errorf ("failed to create client: %w" , err )
@@ -46,10 +69,10 @@ func Tail(ctx context.Context, projectID, clusterName string, cb func(*audit.Aud
46
69
return fmt .Errorf ("stream send failed: %w" , err )
47
70
}
48
71
49
- return read (ctx , stream , cb )
72
+ return readStream (ctx , stream , cb )
50
73
}
51
74
52
- func read (ctx context.Context , stream loggingpb.LoggingServiceV2_TailLogEntriesClient , cb func (* audit.AuditLog ) error ) error {
75
+ func readStream (ctx context.Context , stream loggingpb.LoggingServiceV2_TailLogEntriesClient , cb func (* audit.AuditLog ) error ) error {
53
76
for {
54
77
select {
55
78
case <- ctx .Done ():
0 commit comments