1
1
package agent
2
2
3
3
import (
4
+ "bytes"
4
5
"context"
6
+ "math"
5
7
6
8
"github.com/go-logr/logr"
7
9
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
10
+ "github.com/nginx/agent/v3/pkg/files"
8
11
"google.golang.org/grpc"
9
12
"google.golang.org/grpc/codes"
10
13
"google.golang.org/grpc/status"
@@ -13,6 +16,8 @@ import (
13
16
grpcContext "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
14
17
)
15
18
19
+ const defaultChunkSize uint32 = 2097152 // 2MB
20
+
16
21
// File is an nginx configuration file that the nginx agent gets from the control plane
17
22
// after a ConfigApplyRequest.
18
23
type File struct {
@@ -51,15 +56,83 @@ func (fs *fileService) GetFile(
51
56
ctx context.Context ,
52
57
req * pb.GetFileRequest ,
53
58
) (* pb.GetFileResponse , error ) {
54
- filename := req .GetFileMeta ().GetName ()
55
- hash := req .GetFileMeta ().GetHash ()
56
-
57
59
gi , ok := grpcContext .GrpcInfoFromContext (ctx )
58
60
if ! ok {
59
61
return nil , agentgrpc .ErrStatusInvalidConnection
60
62
}
61
63
62
- conn := fs .connTracker .GetConnection (gi .IPAddress )
64
+ if req .GetFileMeta () == nil {
65
+ return nil , status .Error (codes .InvalidArgument , "invalid request" )
66
+ }
67
+
68
+ contents , err := fs .getFileContents (req , gi .IPAddress )
69
+ if err != nil {
70
+ return nil , err
71
+ }
72
+
73
+ return & pb.GetFileResponse {
74
+ Contents : & pb.FileContents {
75
+ Contents : contents ,
76
+ },
77
+ }, nil
78
+ }
79
+
80
+ // GetFileStream is called by the agent when it needs to download a file in chunks for a ConfigApplyRequest.
81
+ // The deployment object used to get the files is already LOCKED when this function is called,
82
+ // before the ConfigApply transaction is started.
83
+ func (fs * fileService ) GetFileStream (
84
+ req * pb.GetFileRequest ,
85
+ server grpc.ServerStreamingServer [pb.FileDataChunk ],
86
+ ) error {
87
+ gi , ok := grpcContext .GrpcInfoFromContext (server .Context ())
88
+ if ! ok {
89
+ return agentgrpc .ErrStatusInvalidConnection
90
+ }
91
+
92
+ if req .GetFileMeta () == nil || req .GetMessageMeta () == nil {
93
+ return status .Error (codes .InvalidArgument , "invalid request" )
94
+ }
95
+
96
+ contents , err := fs .getFileContents (req , gi .IPAddress )
97
+ if err != nil {
98
+ return err
99
+ }
100
+
101
+ size := req .GetFileMeta ().GetSize ()
102
+ var sizeUint32 uint32
103
+ if size > math .MaxUint32 {
104
+ return status .Error (codes .Internal , "file size is too large and cannot be converted to uint32" )
105
+ }
106
+ sizeUint32 = uint32 (size ) //nolint:gosec // validation check performed on previous line
107
+ hash := req .GetFileMeta ().GetHash ()
108
+
109
+ fs .logger .V (1 ).Info ("Sending chunked file to agent" , "file" , req .GetFileMeta ().GetName ())
110
+
111
+ if err := files .SendChunkedFile (
112
+ req .GetMessageMeta (),
113
+ pb.FileDataChunk_Header {
114
+ Header : & pb.FileDataChunkHeader {
115
+ ChunkSize : defaultChunkSize ,
116
+ Chunks : calculateChunks (sizeUint32 , defaultChunkSize ),
117
+ FileMeta : & pb.FileMeta {
118
+ Name : req .GetFileMeta ().GetName (),
119
+ Hash : hash ,
120
+ Permissions : req .GetFileMeta ().GetPermissions (),
121
+ Size : size ,
122
+ },
123
+ },
124
+ },
125
+ bytes .NewReader (contents ),
126
+ server ,
127
+ ); err != nil {
128
+ return status .Error (codes .Aborted , err .Error ())
129
+ }
130
+
131
+ return nil
132
+ }
133
+
134
+ func (fs * fileService ) getFileContents (req * pb.GetFileRequest , connKey string ) ([]byte , error ) {
135
+ conn := fs .connTracker .GetConnection (connKey )
63
136
if conn .PodName == "" {
64
137
return nil , status .Errorf (codes .NotFound , "connection not found" )
65
138
}
@@ -69,43 +142,47 @@ func (fs *fileService) GetFile(
69
142
return nil , status .Errorf (codes .NotFound , "deployment not found in store" )
70
143
}
71
144
72
- contents := deployment .GetFile (filename , hash )
145
+ filename := req .GetFileMeta ().GetName ()
146
+ contents := deployment .GetFile (filename , req .GetFileMeta ().GetHash ())
73
147
if len (contents ) == 0 {
74
148
return nil , status .Errorf (codes .NotFound , "file not found" )
75
149
}
76
150
77
151
fs .logger .V (1 ).Info ("Getting file for agent" , "file" , filename )
78
152
79
- return & pb.GetFileResponse {
80
- Contents : & pb.FileContents {
81
- Contents : contents ,
82
- },
83
- }, nil
153
+ return contents , nil
154
+ }
155
+
156
+ func calculateChunks (fileSize uint32 , chunkSize uint32 ) uint32 {
157
+ remainder , divide := fileSize % chunkSize , fileSize / chunkSize
158
+ if remainder > 0 {
159
+ return divide + 1
160
+ }
161
+ // if fileSize is divisible by chunkSize without remainder
162
+ // then we don't need the extra chunk for the remainder
163
+ return divide
84
164
}
85
165
86
166
// GetOverview gets the overview of files for a particular configuration version of an instance.
87
167
// At the moment it doesn't appear to be used by the agent.
88
- func (fs * fileService ) GetOverview (
89
- _ context.Context ,
90
- _ * pb.GetOverviewRequest ,
91
- ) (* pb.GetOverviewResponse , error ) {
168
+ func (* fileService ) GetOverview (context.Context , * pb.GetOverviewRequest ) (* pb.GetOverviewResponse , error ) {
92
169
return & pb.GetOverviewResponse {}, nil
93
170
}
94
171
95
172
// UpdateOverview is called by agent on startup and whenever any files change on the instance.
96
173
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
97
- func (fs * fileService ) UpdateOverview (
98
- _ context.Context ,
99
- _ * pb.UpdateOverviewRequest ,
100
- ) (* pb.UpdateOverviewResponse , error ) {
174
+ func (* fileService ) UpdateOverview (context.Context , * pb.UpdateOverviewRequest ) (* pb.UpdateOverviewResponse , error ) {
101
175
return & pb.UpdateOverviewResponse {}, nil
102
176
}
103
177
104
178
// UpdateFile is called by agent whenever any files change on the instance.
105
179
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
106
- func (fs * fileService ) UpdateFile (
107
- _ context.Context ,
108
- _ * pb.UpdateFileRequest ,
109
- ) (* pb.UpdateFileResponse , error ) {
180
+ func (* fileService ) UpdateFile (context.Context , * pb.UpdateFileRequest ) (* pb.UpdateFileResponse , error ) {
110
181
return & pb.UpdateFileResponse {}, nil
111
182
}
183
+
184
+ // UpdateFileStream is called by agent whenever any files change on the instance.
185
+ // Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
186
+ func (* fileService ) UpdateFileStream (grpc.ClientStreamingServer [pb.FileDataChunk , pb.UpdateFileResponse ]) error {
187
+ return nil
188
+ }
0 commit comments