@@ -115,15 +115,16 @@ func SetupWorkers(ctx context.Context) {
115
115
}
116
116
117
117
func redeemWriteMarker (md * markerData ) error {
118
- ctx := datastore .GetStore ().CreateTransaction (context .TODO ())
118
+ timeoutCtx , cancel := context .WithTimeout (context .Background (), 180 * time .Second )
119
+ defer cancel ()
120
+ ctx := datastore .GetStore ().CreateTransaction (timeoutCtx )
119
121
db := datastore .GetStore ().GetTransaction (ctx )
120
122
allocationID := md .allocationID
121
123
shouldRollback := false
122
124
start := time .Now ()
123
125
logging .Logger .Info ("redeeming_write_marker" , zap .String ("allocationID" , allocationID ))
124
126
allocMu := lock .GetMutex (allocation.Allocation {}.TableName (), allocationID )
125
127
allocMu .RLock ()
126
- defer allocMu .RUnlock ()
127
128
defer func () {
128
129
if shouldRollback {
129
130
if rollbackErr := db .Rollback ().Error ; rollbackErr != nil {
@@ -135,8 +136,9 @@ func redeemWriteMarker(md *markerData) error {
135
136
} else {
136
137
deleteMarkerData (allocationID )
137
138
}
139
+ allocMu .RUnlock ()
138
140
}()
139
-
141
+ logging . Logger . Debug ( "getAllocation" , zap . Any ( "allocation" , allocationID ))
140
142
alloc , err := allocation .Repo .GetAllocationFromDB (ctx , allocationID )
141
143
if err != nil {
142
144
logging .Logger .Error ("Error redeeming the write marker." , zap .Any ("allocation" , allocationID ), zap .Any ("wm" , allocationID ), zap .Any ("error" , err ))
@@ -153,7 +155,7 @@ func redeemWriteMarker(md *markerData) error {
153
155
shouldRollback = true
154
156
return nil
155
157
}
156
-
158
+ logging . Logger . Debug ( "getWritemarker" , zap . Any ( "allocation" , allocationID ))
157
159
wm , err := GetWriteMarkerEntity (ctx , alloc .ID , alloc .AllocationRoot )
158
160
if err != nil {
159
161
logging .Logger .Error ("Error redeeming the write marker." , zap .Any ("allocation" , allocationID ), zap .Any ("wm" , alloc .AllocationRoot ), zap .Any ("error" , err ))
@@ -163,7 +165,7 @@ func redeemWriteMarker(md *markerData) error {
163
165
shouldRollback = true
164
166
return err
165
167
}
166
-
168
+ logging . Logger . Debug ( "RedeemMarker" , zap . Any ( "allocation" , allocationID ), zap . Any ( "wm" , wm . WM . AllocationRoot ), zap . Any ( "txn" , wm . CloseTxnID ))
167
169
err = wm .RedeemMarker (ctx , alloc .LastRedeemedSeq + 1 )
168
170
if err != nil {
169
171
elapsedTime := time .Since (start )
@@ -178,7 +180,7 @@ func redeemWriteMarker(md *markerData) error {
178
180
shouldRollback = true
179
181
return err
180
182
}
181
-
183
+ logging . Logger . Debug ( "UpdateAllocationRedeem" , zap . Any ( "allocation" , allocationID ), zap . Any ( "wm" , wm . WM . AllocationRoot ), zap . Any ( "txn" , wm . CloseTxnID ))
182
184
err = allocation .Repo .UpdateAllocationRedeem (ctx , allocationID , wm .WM .AllocationRoot , alloc , wm .Sequence )
183
185
if err != nil {
184
186
logging .Logger .Error ("Error redeeming the write marker. Allocation latest wm redeemed update failed" ,
@@ -188,7 +190,7 @@ func redeemWriteMarker(md *markerData) error {
188
190
go tryAgain (md )
189
191
return err
190
192
}
191
-
193
+ logging . Logger . Debug ( "RedeemMarker" , zap . Any ( "allocation" , allocationID ), zap . Any ( "wm" , wm . WM . AllocationRoot ), zap . Any ( "txn" , wm . CloseTxnID ))
192
194
err = db .Commit ().Error
193
195
if err != nil {
194
196
logging .Logger .Error ("Error committing the writemarker redeem" ,
0 commit comments