Skip to content

Commit 22176f4

Browse files
fix: objectManager implementation avoid racy goroutines (#3392)
fixes #3391
1 parent a89d7ec commit 22176f4

File tree

1 file changed

+123
-128
lines changed

1 file changed

+123
-128
lines changed

api/ws_objects.go

Lines changed: 123 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
3535
defer func() {
3636
// We close socket at the end of requests
3737
wsc.conn.close()
38-
cancelContexts.Range(func(_, value interface{}) bool {
38+
cancelContexts.Range(func(key, value interface{}) bool {
3939
cancelFunc := value.(context.CancelFunc)
4040
cancelFunc()
41+
42+
cancelContexts.Delete(key)
4143
return true
4244
})
4345
}()
@@ -55,6 +57,7 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
5557
// Read goroutine
5658
go func() {
5759
defer close(writeChannel)
60+
5861
for {
5962
select {
6063
case <-done:
@@ -72,10 +75,9 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
7275
// We get request data & review information
7376
var messageRequest ObjectsRequest
7477

75-
err := json.Unmarshal(message, &messageRequest)
76-
if err != nil {
77-
LogInfo("Error on message request unmarshal")
78-
return
78+
if err := json.Unmarshal(message, &messageRequest); err != nil {
79+
LogInfo("Error on message request unmarshal", err)
80+
continue
7981
}
8082

8183
// new message, new context
@@ -84,184 +86,177 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
8486
// We store the cancel func associated with this request
8587
cancelContexts.Store(messageRequest.RequestID, cancel)
8688

87-
const itemsPerBatch = 1000
8889
switch messageRequest.Mode {
89-
case "close":
90-
return
91-
case "cancel":
92-
// if we have that request id, cancel it
93-
if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok {
94-
cancelFunc.(context.CancelFunc)()
95-
cancelContexts.Delete(messageRequest.RequestID)
96-
}
97-
case "objects":
90+
case "objects", "rewind":
9891
// cancel all previous open objects requests for listing
9992
cancelContexts.Range(func(key, value interface{}) bool {
10093
rid := key.(int64)
10194
if rid < messageRequest.RequestID {
10295
cancelFunc := value.(context.CancelFunc)
10396
cancelFunc()
97+
98+
cancelContexts.Delete(key)
10499
}
105100
return true
106101
})
102+
}
107103

104+
const itemsPerBatch = 1000
105+
switch messageRequest.Mode {
106+
case "close":
107+
return
108+
case "cancel":
109+
// if we have that request id, cancel it
110+
if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok {
111+
cancelFunc.(context.CancelFunc)()
112+
cancelContexts.Delete(messageRequest.RequestID)
113+
}
114+
case "objects":
108115
// start listing and writing to web socket
109-
go func() {
110-
objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest)
111-
if err != nil {
112-
LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error()))
116+
objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest)
117+
if err != nil {
118+
LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error()))
119+
120+
sendWSResponse(WSResponse{
121+
RequestID: messageRequest.RequestID,
122+
Error: ErrorWithContext(ctx, err),
123+
Prefix: messageRequest.Prefix,
124+
BucketName: messageRequest.BucketName,
125+
})
126+
127+
return
128+
}
113129

130+
var buffer []ObjectResponse
131+
for lsObj := range startObjectsListing(ctx, wsc.client, objectRqConfigs) {
132+
if lsObj.Err != nil {
114133
sendWSResponse(WSResponse{
115134
RequestID: messageRequest.RequestID,
116-
Error: ErrorWithContext(ctx, err),
135+
Error: ErrorWithContext(ctx, lsObj.Err),
117136
Prefix: messageRequest.Prefix,
118137
BucketName: messageRequest.BucketName,
119138
})
120139

121-
return
140+
continue
122141
}
123-
var buffer []ObjectResponse
124-
for lsObj := range startObjectsListing(ctx, wsc.client, objectRqConfigs) {
125-
if _, ok := cancelContexts.Load(messageRequest.RequestID); !ok {
126-
return
127-
}
128-
129-
if lsObj.Err != nil {
130-
sendWSResponse(WSResponse{
131-
RequestID: messageRequest.RequestID,
132-
Error: ErrorWithContext(ctx, lsObj.Err),
133-
Prefix: messageRequest.Prefix,
134-
BucketName: messageRequest.BucketName,
135-
})
136-
137-
continue
138-
}
139-
objItem := ObjectResponse{
140-
Name: lsObj.Key,
141-
Size: lsObj.Size,
142-
LastModified: lsObj.LastModified.Format(time.RFC3339),
143-
VersionID: lsObj.VersionID,
144-
IsLatest: lsObj.IsLatest,
145-
DeleteMarker: lsObj.IsDeleteMarker,
146-
}
147-
buffer = append(buffer, objItem)
148-
149-
if len(buffer) >= itemsPerBatch {
150-
sendWSResponse(WSResponse{
151-
RequestID: messageRequest.RequestID,
152-
Data: buffer,
153-
})
154-
buffer = nil
155-
}
142+
objItem := ObjectResponse{
143+
Name: lsObj.Key,
144+
Size: lsObj.Size,
145+
LastModified: lsObj.LastModified.Format(time.RFC3339),
146+
VersionID: lsObj.VersionID,
147+
IsLatest: lsObj.IsLatest,
148+
DeleteMarker: lsObj.IsDeleteMarker,
156149
}
157-
if len(buffer) > 0 {
150+
buffer = append(buffer, objItem)
151+
152+
if len(buffer) >= itemsPerBatch {
158153
sendWSResponse(WSResponse{
159154
RequestID: messageRequest.RequestID,
160155
Data: buffer,
161156
})
157+
buffer = nil
162158
}
163-
159+
}
160+
if len(buffer) > 0 {
164161
sendWSResponse(WSResponse{
165-
RequestID: messageRequest.RequestID,
166-
RequestEnd: true,
162+
RequestID: messageRequest.RequestID,
163+
Data: buffer,
167164
})
165+
}
168166

169-
// remove the cancellation context
170-
cancelContexts.Delete(messageRequest.RequestID)
171-
}()
172-
case "rewind":
173-
// cancel all previous open objects requests for listing
174-
cancelContexts.Range(func(key, value interface{}) bool {
175-
rid := key.(int64)
176-
if rid < messageRequest.RequestID {
177-
cancelFunc := value.(context.CancelFunc)
178-
cancelFunc()
179-
}
180-
return true
167+
sendWSResponse(WSResponse{
168+
RequestID: messageRequest.RequestID,
169+
RequestEnd: true,
181170
})
182171

172+
// if we have that request id, cancel it
173+
if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok {
174+
cancelFunc.(context.CancelFunc)()
175+
cancelContexts.Delete(messageRequest.RequestID)
176+
}
177+
case "rewind":
183178
// start listing and writing to web socket
184-
go func() {
185-
objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest)
186-
if err != nil {
187-
LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error()))
188-
sendWSResponse(WSResponse{
189-
RequestID: messageRequest.RequestID,
190-
Error: ErrorWithContext(ctx, err),
191-
Prefix: messageRequest.Prefix,
192-
BucketName: messageRequest.BucketName,
193-
})
179+
objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest)
180+
if err != nil {
181+
LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error()))
182+
sendWSResponse(WSResponse{
183+
RequestID: messageRequest.RequestID,
184+
Error: ErrorWithContext(ctx, err),
185+
Prefix: messageRequest.Prefix,
186+
BucketName: messageRequest.BucketName,
187+
})
194188

195-
return
196-
}
189+
return
190+
}
191+
192+
clientIP := wsc.conn.remoteAddress()
193+
194+
s3Client, err := newS3BucketClient(session, objectRqConfigs.BucketName, objectRqConfigs.Prefix, clientIP)
195+
if err != nil {
196+
sendWSResponse(WSResponse{
197+
RequestID: messageRequest.RequestID,
198+
Error: ErrorWithContext(ctx, err),
199+
Prefix: messageRequest.Prefix,
200+
BucketName: messageRequest.BucketName,
201+
})
202+
203+
return
204+
}
205+
206+
mcS3C := mcClient{client: s3Client}
197207

198-
clientIP := wsc.conn.remoteAddress()
208+
var buffer []ObjectResponse
199209

200-
s3Client, err := newS3BucketClient(session, objectRqConfigs.BucketName, objectRqConfigs.Prefix, clientIP)
201-
if err != nil {
210+
for lsObj := range startRewindListing(ctx, mcS3C, objectRqConfigs) {
211+
if lsObj.Err != nil {
202212
sendWSResponse(WSResponse{
203213
RequestID: messageRequest.RequestID,
204-
Error: ErrorWithContext(ctx, err),
214+
Error: ErrorWithContext(ctx, lsObj.Err.ToGoError()),
205215
Prefix: messageRequest.Prefix,
206216
BucketName: messageRequest.BucketName,
207217
})
208218

209-
cancel()
210-
return
219+
continue
211220
}
212221

213-
mcS3C := mcClient{client: s3Client}
214-
215-
var buffer []ObjectResponse
216-
217-
for lsObj := range startRewindListing(ctx, mcS3C, objectRqConfigs) {
218-
if lsObj.Err != nil {
219-
sendWSResponse(WSResponse{
220-
RequestID: messageRequest.RequestID,
221-
Error: ErrorWithContext(ctx, lsObj.Err.ToGoError()),
222-
Prefix: messageRequest.Prefix,
223-
BucketName: messageRequest.BucketName,
224-
})
225-
226-
continue
227-
}
228-
229-
name := strings.Replace(lsObj.URL.Path, fmt.Sprintf("/%s/", objectRqConfigs.BucketName), "", 1)
230-
231-
objItem := ObjectResponse{
232-
Name: name,
233-
Size: lsObj.Size,
234-
LastModified: lsObj.Time.Format(time.RFC3339),
235-
VersionID: lsObj.VersionID,
236-
IsLatest: lsObj.IsLatest,
237-
DeleteMarker: lsObj.IsDeleteMarker,
238-
}
239-
buffer = append(buffer, objItem)
240-
241-
if len(buffer) >= itemsPerBatch {
242-
sendWSResponse(WSResponse{
243-
RequestID: messageRequest.RequestID,
244-
Data: buffer,
245-
})
246-
buffer = nil
247-
}
222+
name := strings.Replace(lsObj.URL.Path, fmt.Sprintf("/%s/", objectRqConfigs.BucketName), "", 1)
248223

224+
objItem := ObjectResponse{
225+
Name: name,
226+
Size: lsObj.Size,
227+
LastModified: lsObj.Time.Format(time.RFC3339),
228+
VersionID: lsObj.VersionID,
229+
IsLatest: lsObj.IsLatest,
230+
DeleteMarker: lsObj.IsDeleteMarker,
249231
}
250-
if len(buffer) > 0 {
232+
buffer = append(buffer, objItem)
233+
234+
if len(buffer) >= itemsPerBatch {
251235
sendWSResponse(WSResponse{
252236
RequestID: messageRequest.RequestID,
253237
Data: buffer,
254238
})
239+
buffer = nil
255240
}
256241

242+
}
243+
if len(buffer) > 0 {
257244
sendWSResponse(WSResponse{
258-
RequestID: messageRequest.RequestID,
259-
RequestEnd: true,
245+
RequestID: messageRequest.RequestID,
246+
Data: buffer,
260247
})
248+
}
249+
250+
sendWSResponse(WSResponse{
251+
RequestID: messageRequest.RequestID,
252+
RequestEnd: true,
253+
})
261254

262-
// remove the cancellation context
255+
// if we have that request id, cancel it
256+
if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok {
257+
cancelFunc.(context.CancelFunc)()
263258
cancelContexts.Delete(messageRequest.RequestID)
264-
}()
259+
}
265260
}
266261
}
267262
}

0 commit comments

Comments
 (0)