Skip to content

Commit bb68bef

Browse files
Indentify transaction batches and operation batch indexes for easier troubleshooting
1 parent d38b902 commit bb68bef

File tree

3 files changed

+85
-41
lines changed

3 files changed

+85
-41
lines changed

internal/importer/consul.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package importer
22

33
import (
4+
"strconv"
5+
46
"github.com/miniclip/gonsul/internal/entities"
57
"github.com/miniclip/gonsul/internal/util"
68

@@ -15,21 +17,23 @@ const consulTxnLimit = 64
1517
const maximumPayloadSize = 500000 // max size is actually 512kb
1618

1719
// processConsulTransaction ...
18-
func (i *importer) processConsulTransaction(transactions []entities.ConsulTxn) {
20+
func (i *importer) processConsulTransaction(transactions []entities.ConsulTxn, batchNumber int) {
21+
batch := strconv.Itoa(batchNumber)
22+
1923
// Encode our transaction into a JSON payload
2024
jsonPayload, err := json.Marshal(transactions)
2125
if err != nil {
22-
util.ExitError(errors.New("Marshal: "+err.Error()), util.ErrorFailedJsonEncode, i.logger)
26+
util.ExitError(errors.New("Marshal: "+err.Error()+" in Batch "+batch), util.ErrorFailedJsonEncode, i.logger)
2327
}
2428

2529
// Create our URL
2630
consulUrl := i.config.GetConsulURL() + "/v1/txn"
2731

2832
// build our request
29-
i.logger.PrintDebug("CONSUL: creating PUT request")
33+
i.logger.PrintDebug("CONSUL: creating PUT request for Batch " + batch)
3034
req, err := http.NewRequest("PUT", consulUrl, bytes.NewBuffer(jsonPayload))
3135
if err != nil {
32-
util.ExitError(errors.New("NewRequestPUT: "+err.Error()), util.ErrorFailedConsulConnection, i.logger)
36+
util.ExitError(errors.New("NewRequestPUT"+err.Error()+" in Batch "+batch), util.ErrorFailedConsulConnection, i.logger)
3337
}
3438

3539
// Set ACL token
@@ -40,10 +44,10 @@ func (i *importer) processConsulTransaction(transactions []entities.ConsulTxn) {
4044
// Send the request via a client
4145
// Do sends an HTTP request and
4246
// returns an HTTP response
43-
i.logger.PrintDebug("CONSUL: calling PUT request")
47+
i.logger.PrintDebug("CONSUL: calling PUT request for Batch " + batch)
4448
resp, err := i.client.Do(req)
4549
if err != nil {
46-
util.ExitError(errors.New("DoPUT: "+err.Error()), util.ErrorFailedConsulConnection, i.logger)
50+
util.ExitError(errors.New("DoPUT: "+err.Error()+" for Batch "+batch), util.ErrorFailedConsulConnection, i.logger)
4751
}
4852

4953
// Clean response after function ends
@@ -54,21 +58,21 @@ func (i *importer) processConsulTransaction(transactions []entities.ConsulTxn) {
5458
}()
5559

5660
// Read the response body
57-
i.logger.PrintDebug("CONSUL: reading PUT response")
61+
i.logger.PrintDebug("CONSUL: reading PUT response from Batch " + batch)
5862
bodyBytes, err := ioutil.ReadAll(resp.Body)
5963
if err != nil {
60-
util.ExitError(errors.New("ReadPutResponse: "+err.Error()), util.ErrorFailedReadingResponse, i.logger)
64+
util.ExitError(errors.New("ReadPutResponse: "+err.Error()+" in Batch "+batch), util.ErrorFailedReadingResponse, i.logger)
6165
}
6266

6367
// Cast response to string
6468
bodyString := string(bodyBytes)
6569

6670
if resp.StatusCode != 200 {
67-
util.ExitError(errors.New("TransactionError: "+bodyString), util.ErrorFailedConsulTxn, i.logger)
71+
util.ExitError(errors.New("TransactionError: "+bodyString+" in Batch "+batch), util.ErrorFailedConsulTxn, i.logger)
6872
}
6973

7074
// All good. Output some status for each transaction operation
7175
for _, txn := range transactions {
72-
i.logger.PrintInfo("Operation: " + *txn.KV.Verb + " Path: " + *txn.KV.Key)
76+
i.logger.PrintInfo("Operation: " + *txn.KV.Verb + " Path: " + *txn.KV.Key + " Batch: " + batch)
7377
}
7478
}

internal/importer/helper.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package importer
22

33
import (
4+
"strings"
5+
46
"github.com/miniclip/gonsul/internal/entities"
57
"github.com/miniclip/gonsul/internal/util"
6-
"strings"
78

89
"github.com/cbroglie/mustache"
910
"github.com/olekukonko/tablewriter"
@@ -16,6 +17,7 @@ import (
1617
"net/http"
1718
"os"
1819
"path"
20+
"strconv"
1921
)
2022

2123
// createOperationMatrix ...
@@ -146,17 +148,56 @@ func (i *importer) printOperations(matrix entities.OperationMatrix, printWhat st
146148
if matrix.GetTotalOps() > 0 {
147149
// Instantiate our table and set table header
148150
table := tablewriter.NewWriter(os.Stdout)
149-
table.SetHeader([]string{"", "OPERATION NAME", "CONSUL VERB", "PATH"})
151+
table.SetHeader([]string{"", "BATCH", "OP INDEX", "OPERATION NAME", "CONSUL VERB", "PATH"})
150152
// Align our rows
151153
table.SetAlignment(tablewriter.ALIGN_LEFT)
154+
155+
// Initialize the batch counter
156+
batch := 1
157+
opIndex := 0
158+
159+
var transactions []entities.ConsulTxn
160+
var newTransactions []entities.ConsulTxn
161+
152162
// Loop each operation and add to table
153163
for _, op := range matrix.GetOperations() {
164+
154165
if printWhat == entities.OperationAll || printWhat == op.GetType() {
166+
var TxnKV entities.ConsulTxnKV
167+
var warning string
168+
169+
// generate the actual payload to calculate it's lenght
170+
verb := op.GetVerb()
171+
path := op.GetPath()
155172
if op.GetType() == entities.OperationDelete {
156-
table.Append([]string{"!!", op.GetType(), op.GetVerb(), op.GetPath()})
173+
warning = "!!"
174+
TxnKV = entities.ConsulTxnKV{Verb: &verb, Key: &path}
157175
} else {
158-
table.Append([]string{"", op.GetType(), op.GetVerb(), op.GetPath()})
176+
warning = ""
177+
val := op.GetValue()
178+
TxnKV = entities.ConsulTxnKV{Verb: &verb, Key: &path, Value: &val}
179+
}
180+
181+
// add the next transaction and check payload lenght
182+
newTransactions = transactions
183+
newTransactions = append(newTransactions, entities.ConsulTxn{KV: TxnKV})
184+
newPayloadSize := i.getTransactionsPayloadSize(&newTransactions)
185+
186+
// If the next transaction brings us over the maximum payload size,
187+
// or the maximum transaction per batch limit is reached, start a new batch
188+
if newPayloadSize > maximumPayloadSize || len(transactions) == consulTxnLimit {
189+
// reset transactions and add the next transaction
190+
transactions = []entities.ConsulTxn{}
191+
// start a new batch counter
192+
opIndex = 0
193+
batch++
159194
}
195+
196+
transactions = append(transactions, entities.ConsulTxn{KV: TxnKV})
197+
198+
table.Append([]string{warning, strconv.Itoa(batch), strconv.Itoa(opIndex), op.GetType(), op.GetVerb(), op.GetPath()})
199+
200+
opIndex++
160201
}
161202
}
162203
// Outputs ASCII table
@@ -178,3 +219,13 @@ func (i *importer) setDeletesToLogger(matrix entities.OperationMatrix) {
178219
}
179220
}
180221
}
222+
223+
// Get the payload size for a slice of transactions
224+
func (i *importer) getTransactionsPayloadSize(transactions *[]entities.ConsulTxn) int {
225+
payload, err := json.Marshal(&transactions)
226+
if err != nil {
227+
util.ExitError(errors.New("Marshal: "+err.Error()), util.ErrorFailedJsonEncode, i.logger)
228+
}
229+
230+
return len(string(payload))
231+
}

internal/importer/importer.go

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"github.com/miniclip/gonsul/internal/entities"
66
"github.com/miniclip/gonsul/internal/util"
77

8-
"encoding/json"
98
"errors"
109
"fmt"
1110
"net/http"
@@ -41,11 +40,11 @@ func (i *importer) Start(localData map[string]string) {
4140
// Create our operations Matrix
4241
ops = i.createOperationMatrix(liveData, localData)
4342

43+
// Print operation table
44+
i.printOperations(ops, entities.OperationAll)
4445
// Check if it's a dry run
4546
if i.config.GetStrategy() == config.StrategyDry {
46-
// Print matrix and exit
47-
i.printOperations(ops, entities.OperationAll)
48-
47+
// Exit after having printed the operations table
4948
return
5049
}
5150

@@ -72,7 +71,11 @@ func (i *importer) processOperations(matrix entities.OperationMatrix) {
7271
util.ExitError(errors.New(""), util.ErrorDeleteNotAllowed, i.logger)
7372
}
7473

74+
// Initialize the batch counter
75+
batch := 1
76+
7577
var transactions []entities.ConsulTxn
78+
var newTransactions []entities.ConsulTxn
7679

7780
// Fill our channel to indicate a non interruptible work (It stops here if interruption in progress)
7881
i.config.WorkingChan() <- true
@@ -84,12 +87,6 @@ func (i *importer) processOperations(matrix entities.OperationMatrix) {
8487
verb := op.GetVerb()
8588
path := op.GetPath()
8689

87-
currentPayload, err := json.Marshal(transactions)
88-
if err != nil {
89-
util.ExitError(errors.New("Marshal: "+err.Error()), util.ErrorFailedJsonEncode, i.logger)
90-
}
91-
currentPayloadSize := len(currentPayload)
92-
9390
var TxnKV entities.ConsulTxnKV
9491

9592
if op.GetType() == entities.OperationDelete {
@@ -99,32 +96,24 @@ func (i *importer) processOperations(matrix entities.OperationMatrix) {
9996
TxnKV = entities.ConsulTxnKV{Verb: &verb, Key: &path, Value: &val}
10097
}
10198

102-
// If the next transaction brings us over the maximum payload size, flush the current transactions
103-
nextOpPayload, err := json.Marshal(TxnKV)
104-
if err != nil {
105-
util.ExitError(errors.New("Marshal: "+err.Error()), util.ErrorFailedJsonEncode, i.logger)
106-
}
99+
// add the next transaction and check payload lenght
100+
newTransactions = transactions
101+
newTransactions = append(transactions, entities.ConsulTxn{KV: TxnKV})
102+
newPayloadSize := i.getTransactionsPayloadSize(&newTransactions)
107103

108-
if currentPayloadSize+len(nextOpPayload) > maximumPayloadSize {
109-
i.processConsulTransaction(transactions)
104+
if newPayloadSize > maximumPayloadSize || len(transactions) == consulTxnLimit {
105+
i.processConsulTransaction(transactions, batch)
110106
transactions = []entities.ConsulTxn{}
107+
108+
batch++
111109
}
112110

113111
transactions = append(transactions, entities.ConsulTxn{KV: TxnKV})
114-
115-
if len(transactions) == consulTxnLimit {
116-
// Flush current transactions because we hit max operation per transaction
117-
// One day Consul will release an API endpoint from where we can get this limit
118-
// so we do can stop hardcoding this constant
119-
i.processConsulTransaction(transactions)
120-
// Reset our transaction array
121-
transactions = []entities.ConsulTxn{}
122-
}
123112
}
124113

125114
// Do we have transactions to process
126115
if len(transactions) > 0 {
127-
i.processConsulTransaction(transactions)
116+
i.processConsulTransaction(transactions, batch)
128117
}
129118

130119
// Consume our channel, to re-allow application interruption

0 commit comments

Comments
 (0)