Skip to content

CBG-4435: Count scheduled compaction runs as idle KV/Query ops #7310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const (
StatAddedVersion3dot1dot4 = "3.1.4"
StatAddedVersion3dot2dot0 = "3.2.0"
StatAddedVersion3dot2dot1 = "3.2.1"
StatAddedVersion3dot2dot2 = "3.2.2"
StatAddedVersion3dot3dot0 = "3.3.0"

StatDeprecatedVersionNotDeprecated = ""
Expand Down Expand Up @@ -312,6 +313,10 @@ func (g *GlobalStat) initResourceUtilizationStats() error {
if err != nil {
return err
}
resUtil.NumIdleQueryOps, err = NewIntStat(SubsystemDatabaseKey, "num_idle_query_ops", StatUnitNoUnits, NumIdleQueryOpsDesc, StatAddedVersion3dot2dot2, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, nil, nil, prometheus.CounterValue, 0)
if err != nil {
return err
}

resUtil.Uptime, err = NewDurStat(ResourceUtilizationSubsystem, "uptime", StatUnitNanoseconds, UptimeDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, nil, nil, prometheus.CounterValue, time.Now())
if err != nil {
Expand Down Expand Up @@ -366,8 +371,9 @@ type ResourceUtilization struct {
// The node CPU usage calculation based values from /proc of user + system since the last time this function was called.
NodeCpuPercentUtil *SgwFloatStat `json:"node_cpu_percent_utilization"`

// The number of background kv operations.
NumIdleKvOps *SgwIntStat `json:"idle_kv_ops"`
// The number of background kv/query operations.
NumIdleKvOps *SgwIntStat `json:"idle_kv_ops"`
NumIdleQueryOps *SgwIntStat `json:"idle_query_ops"`

// The memory utilization (Resident Set Size) for the process, in bytes.
ProcessMemoryResident *SgwIntStat `json:"process_memory_resident"`
Expand Down
3 changes: 2 additions & 1 deletion base/stats_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ const (

SyncProcessComputeDesc = "The compute unit for syncing with clients measured through cpu time and memory used for sync"

NumIdleKvOpsDesc = "The total number of idle kv operations."
NumIdleKvOpsDesc = "The total number of idle kv operations."
NumIdleQueryOpsDesc = "The total number of idle query operations."
)

// Delta Sync stats descriptions
Expand Down
4 changes: 2 additions & 2 deletions db/background_mgr_tombstone_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func (t *TombstoneCompactionManager) Run(ctx context.Context, options map[string
database := options["database"].(*Database)

defer atomic.CompareAndSwapUint32(&database.CompactState, DBCompactRunning, DBCompactNotRunning)
callback := func(docsPurged *int) {
updateStatusCallback := func(docsPurged *int) {
atomic.StoreInt64(&t.PurgedDocCount, int64(*docsPurged))
}

_, err := database.Compact(ctx, true, callback, terminator)
_, err := database.Compact(ctx, true, updateStatusCallback, terminator, false)
if err != nil {
return err
}
Expand Down
38 changes: 29 additions & 9 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,15 +1445,19 @@ func (db *DatabaseContext) GetRoleIDs(ctx context.Context, useViews, includeDele
return roles, nil
}

// Trigger tombstone compaction from view and/or GSI indexes. Several Sync Gateway indexes server tombstones (deleted documents with an xattr).
type compactProgressCallbackFunc func(purgedDocCount *int)

// Compact runs tombstone compaction from view and/or GSI indexes - ensuring there's nothing left in the indexes for tombstoned documents that have been purged by the server.
//
// Several Sync Gateway indexes server tombstones (deleted documents with an xattr).
// There currently isn't a mechanism for server to remove these docs from the index when the tombstone is purged by the server during
// metadata purge, because metadata purge doesn't trigger a DCP event.
// When compact is run, Sync Gateway initiates a normal delete operation for the document and xattr (a Sync Gateway purge). This triggers
// removal of the document from the index. In the event that the document has already been purged by server, we need to recreate and delete
// the document to accomplish the same result.
type compactCallbackFunc func(purgedDocCount *int)

func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, callback compactCallbackFunc, terminator *base.SafeTerminator) (int, error) {
//
// The `isScheduledBackgroundTask` parameter is used to indicate if the compaction is being run as part of a scheduled background task, or an ad-hoc user-initiated `/{db}/_compact` request.
func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, optionalProgressCallback compactProgressCallbackFunc, terminator *base.SafeTerminator, isScheduledBackgroundTask bool) (purgedDocCount int, err error) {
if !skipRunningStateCheck {
if !atomic.CompareAndSwapUint32(&db.CompactState, DBCompactNotRunning, DBCompactRunning) {
return 0, base.HTTPErrorf(http.StatusServiceUnavailable, "Compaction already running")
Expand All @@ -1474,12 +1478,13 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
startTime := time.Now()
purgeOlderThan := startTime.Add(-purgeInterval)

purgedDocCount := 0
purgeErrorCount := 0
addErrorCount := 0
deleteErrorCount := 0

defer callback(&purgedDocCount)
if optionalProgressCallback != nil {
defer optionalProgressCallback(&purgedDocCount)
}

base.InfofCtx(ctx, base.KeyAll, "Starting compaction of purged tombstones for %s ...", base.MD(db.Name))

Expand All @@ -1498,6 +1503,9 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
for {
purgedDocs := make([]string, 0)
results, err := collection.QueryTombstones(ctx, purgeOlderThan, QueryTombstoneBatch)
if isScheduledBackgroundTask {
base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleQueryOps.Add(1)
}
if err != nil {
return 0, err
}
Expand All @@ -1518,11 +1526,17 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
base.DebugfCtx(ctx, base.KeyCRUD, "\tDeleting %q", tombstonesRow.Id)
// First, attempt to purge.
purgeErr := collection.Purge(ctx, tombstonesRow.Id, false)
if isScheduledBackgroundTask {
base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Add(1)
}
if purgeErr == nil {
purgedDocs = append(purgedDocs, tombstonesRow.Id)
} else if base.IsDocNotFoundError(purgeErr) {
// If key no longer exists, need to add and remove to trigger removal from view
_, addErr := collection.dataStore.Add(tombstonesRow.Id, 0, purgeBody)
if isScheduledBackgroundTask {
base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Add(1)
}
if addErr != nil {
addErrorCount++
base.InfofCtx(ctx, base.KeyAll, "Couldn't compact key %s (add): %v", base.UD(tombstonesRow.Id), addErr)
Expand All @@ -1533,7 +1547,11 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
// so mark it to be removed from cache, even if the subsequent delete fails
purgedDocs = append(purgedDocs, tombstonesRow.Id)

if delErr := collection.dataStore.Delete(tombstonesRow.Id); delErr != nil {
delErr := collection.dataStore.Delete(tombstonesRow.Id)
if isScheduledBackgroundTask {
base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Add(1)
}
if delErr != nil {
deleteErrorCount++
base.InfofCtx(ctx, base.KeyAll, "Couldn't compact key %s (delete): %v", base.UD(tombstonesRow.Id), delErr)
}
Expand All @@ -1557,7 +1575,9 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
}
base.InfofCtx(ctx, base.KeyAll, "Compacted %v tombstones", count)

callback(&purgedDocCount)
if optionalProgressCallback != nil {
optionalProgressCallback(&purgedDocCount)
}

if resultCount < QueryTombstoneBatch {
break
Expand Down Expand Up @@ -2428,7 +2448,7 @@ func (db *DatabaseContext) StartOnlineProcesses(ctx context.Context) (returnedEr
bgtTerminator.Close()
}()
bgt, err := NewBackgroundTask(ctx, "Compact", func(ctx context.Context) error {
_, err := db.Compact(ctx, false, func(purgedDocCount *int) {}, bgtTerminator)
_, err := db.Compact(ctx, false, nil, bgtTerminator, true)
if err != nil {
base.WarnfCtx(ctx, "Error trying to compact tombstoned documents for %q with error: %v", db.Name, err)
}
Expand Down
3 changes: 3 additions & 0 deletions docs/api/components/schemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ ExpVars:
num_idle_kv_ops:
type: integer
description: "The total number of idle kv operations."
num_idle_query_ops:
type: integer
description: "The total number of idle query operations."
process_cpu_percent_utilization:
type: number
format: float
Expand Down
2 changes: 1 addition & 1 deletion rest/adminapitest/admin_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4166,7 +4166,7 @@ func TestTombstoneCompactionPurgeInterval(t *testing.T) {

// Start compact to modify purge interval
database, _ := db.GetDatabase(dbc, nil)
_, err = database.Compact(ctx, false, func(purgedDocCount *int) {}, base.NewSafeTerminator())
_, err = database.Compact(ctx, false, nil, base.NewSafeTerminator(), false)
require.NoError(t, err)

assert.EqualValues(t, test.expectedPurgeIntervalAfterCompact, dbc.GetMetadataPurgeInterval(ctx))
Expand Down
134 changes: 84 additions & 50 deletions rest/changestest/changes_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3500,6 +3500,20 @@ func TestTombstoneCompaction(t *testing.T) {
t.Skip("If running with no xattrs compact acts as a no-op")
}

tests := []struct {
numDocs int
runAsScheduledBackgroundTask bool
}{
// Multiples of Batch Size
{numDocs: db.QueryTombstoneBatch},
{numDocs: db.QueryTombstoneBatch * 4},
// Smaller Than Batch Size
{numDocs: 2},
{numDocs: db.QueryTombstoneBatch / 4},
// Larger than Batch Size
{numDocs: db.QueryTombstoneBatch + 20},
}

var rt *rest.RestTester
numCollections := 1

Expand All @@ -3510,64 +3524,84 @@ func TestTombstoneCompaction(t *testing.T) {
rt = rest.NewRestTester(t, nil)
}
defer rt.Close()
zero := time.Duration(0)
rt.GetDatabase().Options.PurgeInterval = &zero

compactionTotal := 0
expectedBatches := 0
rt.GetDatabase().Options.PurgeInterval = base.Ptr(time.Duration(0))

TestCompact := func(numDocs int) {

count := 0
for _, test := range tests {
for _, runAsScheduledBackgroundTask := range []bool{false, true} {
t.Run(fmt.Sprintf("numDocs:%d asBackgroundTask:%v", test.numDocs, runAsScheduledBackgroundTask), func(t *testing.T) {

// seed with tombstones
for count := 0; count < test.numDocs; count++ {
for _, keyspace := range rt.GetKeyspaces() {
response := rt.SendAdminRequest("POST", fmt.Sprintf("/%s/", keyspace), `{"foo":"bar"}`)
assert.Equal(t, http.StatusOK, response.Code)
var body db.Body
err := base.JSONUnmarshal(response.Body.Bytes(), &body)
assert.NoError(t, err)
revID := body["rev"].(string)
docID := body["id"].(string)

response = rt.SendAdminRequest("DELETE", fmt.Sprintf("/%s/%s?rev=%s", keyspace, docID, revID), "")
assert.Equal(t, http.StatusOK, response.Code)
}
}

for count < numDocs {
count++
for _, keyspace := range rt.GetKeyspaces() {
response := rt.SendAdminRequest("POST", fmt.Sprintf("/%s/", keyspace), `{"foo":"bar"}`)
assert.Equal(t, 200, response.Code)
var body db.Body
err := base.JSONUnmarshal(response.Body.Bytes(), &body)
assert.NoError(t, err)
revId := body["rev"].(string)
docId := body["id"].(string)
expectedCompactions := test.numDocs * numCollections
expectedBatches := (test.numDocs/db.QueryTombstoneBatch + 1) * numCollections

response = rt.SendAdminRequest("DELETE", fmt.Sprintf("/%s/%s?rev=%s", keyspace, docId, revId), "")
assert.Equal(t, 200, response.Code)
}
}
resp := rt.SendAdminRequest("POST", "/{{.db}}/_compact", "")
rest.RequireStatus(t, resp, http.StatusOK)
numCompactionsBefore := int(rt.GetDatabase().DbStats.Database().NumTombstonesCompacted.Value())
var numBatchesBefore int
if base.TestsDisableGSI() {
numBatchesBefore = int(rt.GetDatabase().DbStats.Query(fmt.Sprintf(base.StatViewFormat, db.DesignDocSyncHousekeeping(), db.ViewTombstones)).QueryCount.Value())
} else {
numBatchesBefore = int(rt.GetDatabase().DbStats.Query(db.QueryTypeTombstones).QueryCount.Value())
}

err := rt.WaitForCondition(func() bool {
time.Sleep(1 * time.Second)
return rt.GetDatabase().TombstoneCompactionManager.GetRunState() == db.BackgroundProcessStateCompleted
})
assert.NoError(t, err)
numIdleKvOpsBefore := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Value())
numIdleQueryOpsBefore := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleQueryOps.Value())

if runAsScheduledBackgroundTask {
database, err := db.CreateDatabase(rt.GetDatabase())
require.NoError(t, err)
purgedCount, err := database.Compact(base.TestCtx(t), false, nil, base.NewSafeTerminator(), true)
require.NoError(t, err)
require.Equal(t, expectedCompactions, purgedCount)

numIdleKvOpsAfter := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Value())
numIdleQueryOpsAfter := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleQueryOps.Value())

// cannot do equal here because there are other idle kv ops unrelated to compaction
assert.GreaterOrEqual(t, numIdleKvOpsAfter-numIdleKvOpsBefore, expectedCompactions)
assert.Equal(t, numIdleQueryOpsAfter-numIdleQueryOpsBefore, expectedBatches)
} else {
resp := rt.SendAdminRequest("POST", "/{{.db}}/_compact", "")
rest.RequireStatus(t, resp, http.StatusOK)
err := rt.WaitForCondition(func() bool {
return rt.GetDatabase().TombstoneCompactionManager.GetRunState() == db.BackgroundProcessStateCompleted
})
assert.NoError(t, err)

numIdleKvOpsAfter := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Value())
numIdleQueryOpsAfter := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleQueryOps.Value())

// ad-hoc compactions don't invoke idle ops - but we do have other idle kv ops so can't ensure it stays zero
assert.GreaterOrEqual(t, numIdleKvOpsAfter-numIdleKvOpsBefore, 0)
assert.Equal(t, numIdleQueryOpsAfter-numIdleQueryOpsBefore, 0)
}

compactionTotal += (numDocs * numCollections)
require.Equal(t, compactionTotal, int(rt.GetDatabase().DbStats.Database().NumTombstonesCompacted.Value()))
actualCompactions := int(rt.GetDatabase().DbStats.Database().NumTombstonesCompacted.Value()) - numCompactionsBefore
require.Equal(t, expectedCompactions, actualCompactions)

var actualBatches int64
if base.TestsDisableGSI() {
actualBatches = rt.GetDatabase().DbStats.Query(fmt.Sprintf(base.StatViewFormat, db.DesignDocSyncHousekeeping(), db.ViewTombstones)).QueryCount.Value()
} else {
actualBatches = rt.GetDatabase().DbStats.Query(db.QueryTypeTombstones).QueryCount.Value()
var actualBatches int
if base.TestsDisableGSI() {
actualBatches = int(rt.GetDatabase().DbStats.Query(fmt.Sprintf(base.StatViewFormat, db.DesignDocSyncHousekeeping(), db.ViewTombstones)).QueryCount.Value()) - numBatchesBefore
} else {
actualBatches = int(rt.GetDatabase().DbStats.Query(db.QueryTypeTombstones).QueryCount.Value()) - numBatchesBefore
}
require.Equal(t, expectedBatches, actualBatches)
})
}

expectedBatches += (numDocs/db.QueryTombstoneBatch + 1) * numCollections
require.Equal(t, expectedBatches, int(actualBatches))
}

// Multiples of Batch Size
TestCompact(db.QueryTombstoneBatch)
TestCompact(db.QueryTombstoneBatch * 4)

// Smaller Than Batch Size
TestCompact(2)
TestCompact(db.QueryTombstoneBatch / 4)

// Larger than Batch Size
TestCompact(db.QueryTombstoneBatch + 20)
}

// TestOneShotGrantTiming simulates a one-shot changes feed returning before a previously issued grant has been
Expand Down
Loading