From e081983b271fc78e0ebaaee5ac6b2452f09f6393 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Sun, 12 Oct 2025 22:04:33 -0700 Subject: [PATCH 1/6] initial changes for workloads --- sdk/data/azcosmos/workloads/dev.md | 32 +++ sdk/data/azcosmos/workloads/initial_setup.go | 200 ++++++++++++++++++ sdk/data/azcosmos/workloads/main/main.go | 16 ++ .../azcosmos/workloads/workload_config.go | 99 +++++++++ 4 files changed, 347 insertions(+) create mode 100644 sdk/data/azcosmos/workloads/dev.md create mode 100644 sdk/data/azcosmos/workloads/initial_setup.go create mode 100644 sdk/data/azcosmos/workloads/main/main.go create mode 100644 sdk/data/azcosmos/workloads/workload_config.go diff --git a/sdk/data/azcosmos/workloads/dev.md b/sdk/data/azcosmos/workloads/dev.md new file mode 100644 index 000000000000..353f90484755 --- /dev/null +++ b/sdk/data/azcosmos/workloads/dev.md @@ -0,0 +1,32 @@ +## SDK Scale Testing +This directory contains the scale testing workloads for the SDK. The workloads are designed to test the performance +and scalability of the SDK under various conditions. + +### Setup Scale Testing +1. Create a VM in Azure with the following configuration: + - 8 vCPUs + - 32 GB RAM + - Ubuntu + - Accelerated networking +1. Give the VM necessary [permissions](https://learn.microsoft.com/azure/cosmos-db/nosql/how-to-grant-data-plane-access?tabs=built-in-definition%2Ccsharp&pivots=azure-interface-cli) to access the Cosmos DB account if using AAD (Optional). +1. Fork and clone this repository +1. Go to azcosmos folder + - `cd azure-sdk-for-go/sdk/data/azcosmos` +1. Checkout the branch with the changes to test. +1. Go to workloads folder + - `cd workloads` +1. Fill out relevant configs in `workload_configs.go`: key, host, etc using env variables +1. Run the setup workload to create the database and containers and insert test data + - `python3 initial-setup.py` +1. Run the scale workload + - `go run ./main/main.go` + +### Monitor Run +- `ps -eaf | grep "go"` to see the running processes +- `tail -f ` to see the logs in real time + +### Close Workloads +- If you want to keep the logs and stop the scripts, + `./shutdown_workloads.sh --do-not-remove-logs` +- If you want to remove the logs and stop the scripts, + `./shutdown_workloads.sh` diff --git a/sdk/data/azcosmos/workloads/initial_setup.go b/sdk/data/azcosmos/workloads/initial_setup.go new file mode 100644 index 000000000000..0b4aa07263f7 --- /dev/null +++ b/sdk/data/azcosmos/workloads/initial_setup.go @@ -0,0 +1,200 @@ +package workloads + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "math/rand" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" +) + +func createRandomItem(i int) map[string]interface{} { + return map[string]interface{}{ + "type": "testItem", + "createdAt": time.Now().UTC().Format(time.RFC3339Nano), + "seq": i, + "value": rand.Int63(), // pseudo-random payload + } +} + +func createClient(cfg workloadConfig) (*azcosmos.Client, error) { + cred, err := azcosmos.NewKeyCredential(cfg.Key) + if err != nil { + return nil, err + } + opts := &azcosmos.ClientOptions{ + PreferredRegions: cfg.PreferredLocations, + // Add EnableContentResponseOnWrite: true if you want full responses + } + return azcosmos.NewClientWithKey(cfg.Endpoint, cred, opts) +} + +// createDatabaseIfNotExists attempts to create the database, tolerating conflicts. +func createDatabaseIfNotExists(ctx context.Context, client *azcosmos.Client, dbID string) (*azcosmos.DatabaseClient, error) { + dbClient, err := client.NewDatabase(dbID) + if err != nil { + return nil, err + } + props := azcosmos.DatabaseProperties{ID: dbID} + _, err = client.CreateDatabase(ctx, props, nil) + if err != nil { + var azErr *azcore.ResponseError + if errors.As(err, &azErr) { + if azErr.StatusCode == 409 { + return dbClient, nil // already exists + } + } + return nil, err + } + return dbClient, nil +} + +func createContainerIfNotExists(ctx context.Context, db *azcosmos.DatabaseClient, containerID, pkField string) (*azcosmos.ContainerClient, error) { + containerClient, err := db.NewContainer(containerID) + if err != nil { + return nil, err + } + + // Build container properties + props := azcosmos.ContainerProperties{ + ID: containerID, + PartitionKeyDefinition: azcosmos.PartitionKeyDefinition{ + Paths: []string{"/" + pkField}, + Kind: azcosmos.PartitionKeyKindHash, + }, + } + + // Try create + _, err = db.CreateContainer(ctx, props, nil) + if err != nil { + var azErr *azcore.ResponseError + if errors.As(err, &azErr) { + if azErr.StatusCode == 409 { + return containerClient, nil // already exists + } + } + return nil, err + } + + return containerClient, nil +} + +// upsertItemsConcurrently performs count upserts concurrently. +func upsertItemsConcurrently(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { + // Use a bounded worker pool to avoid oversaturating resources + workers := 32 + if count < workers { + workers = count + } + type job struct { + i int + } + jobs := make(chan job, workers) + errs := make(chan error, count) + wg := &sync.WaitGroup{} + + for w := 0; w < workers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := range jobs { + item := createRandomItem(j.i) + item["id"] = fmt.Sprintf("test-%d", j.i) + item[pkField] = fmt.Sprintf("pk-%d", j.i) + + // Marshal item to bytes; UpsertItem often takes []byte + partition key value + body, err := json.Marshal(item) + if err != nil { + errs <- err + continue + } + + pk := azcosmos.NewPartitionKeyString(item[pkField].(string)) + _, err = container.UpsertItem(ctx, pk, body, nil) + if err != nil { + errs <- err + continue + } + println("writing an item") + } + }() + } + +sendLoop: + for i := 0; i < count; i++ { + select { + case <-ctx.Done(): + break sendLoop + default: + jobs <- job{i: i} + } + } + close(jobs) + wg.Wait() + close(errs) + + // Aggregate errors if any + var firstErr error + for e := range errs { + if firstErr == nil { + firstErr = e + } + } + return firstErr +} + +// RunWorkload creates the database/container and performs the concurrent upserts. +func RunWorkload(ctx context.Context) error { + cfg, err := loadConfig() + if err != nil { + return err + } + + println("Creating client...") + client, err := createClient(cfg) + if err != nil { + return fmt.Errorf("creating client: %w", err) + } + + println("Creating database...") + dbClient, err := createDatabaseIfNotExists(ctx, client, cfg.DatabaseID) + if err != nil { + return fmt.Errorf("ensure database: %w", err) + } + println("Creating container...") + + container, err := createContainerIfNotExists(ctx, dbClient, cfg.ContainerID, cfg.PartitionKeyFieldName) + if err != nil { + return fmt.Errorf("ensure container: %w", err) + } + + // NUMBER_OF_LOGICAL_PARTITIONS + 1 + var count = cfg.LogicalPartitions + 1 + + println("Starting workload...") + + log.Printf("Starting %d concurrent upserts...", count) + + if err := upsertItemsConcurrently(ctx, container, count, cfg.PartitionKeyFieldName); err != nil { + return fmt.Errorf("upserts failed: %w", err) + } + + log.Printf("Completed %d upserts.", count) + return nil +} + +// main-style entry (optional if you want a standalone runnable). +// If you prefer a standalone executable place this in package main instead. +func Run() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + if err := RunWorkload(ctx); err != nil { + log.Fatalf("workload failed: %v", err) + } +} diff --git a/sdk/data/azcosmos/workloads/main/main.go b/sdk/data/azcosmos/workloads/main/main.go new file mode 100644 index 000000000000..6b17c47957c4 --- /dev/null +++ b/sdk/data/azcosmos/workloads/main/main.go @@ -0,0 +1,16 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos/workloads" +) + +func main() { + _, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + workloads.Run() // or workloads.RunWorkload(ctx) if you make RunWorkload public + log.Println("done") +} diff --git a/sdk/data/azcosmos/workloads/workload_config.go b/sdk/data/azcosmos/workloads/workload_config.go new file mode 100644 index 000000000000..da5141221dc6 --- /dev/null +++ b/sdk/data/azcosmos/workloads/workload_config.go @@ -0,0 +1,99 @@ +package workloads + +import ( + "fmt" + "os" + "strconv" +) + +// Configuration loaded from environment (mirrors the Python version) +type workloadConfig struct { + Endpoint string + Key string + PreferredLocations []string + DatabaseID string + ContainerID string + PartitionKeyFieldName string + LogicalPartitions int + Throughput int // optional (unused if not supported) +} + +func loadConfig() (workloadConfig, error) { + get := func(name string) (string, error) { + v := os.Getenv(name) + if v == "" { + return "", fmt.Errorf("missing env var %s", name) + } + return v, nil + } + + var cfg workloadConfig + var err error + + if cfg.Endpoint, err = get("COSMOS_URI"); err != nil { + return cfg, err + } + if cfg.Key, err = get("COSMOS_KEY"); err != nil { + return cfg, err + } + if cfg.DatabaseID, err = get("COSMOS_DATABASE"); err != nil { + return cfg, err + } + if cfg.ContainerID, err = get("COSMOS_CONTAINER"); err != nil { + return cfg, err + } + if pk := os.Getenv("PARTITION_KEY"); pk != "" { + cfg.PartitionKeyFieldName = pk + } else { + cfg.PartitionKeyFieldName = "pk" + } + + if lp := os.Getenv("NUMBER_OF_LOGICAL_PARTITIONS"); lp != "" { + n, convErr := strconv.Atoi(lp) + if convErr != nil { + return cfg, fmt.Errorf("invalid NUMBER_OF_LOGICAL_PARTITIONS: %w", convErr) + } + cfg.LogicalPartitions = n + } else { + cfg.LogicalPartitions = 10000 + } + + if tp := os.Getenv("THROUGHPUT"); tp != "" { + n, convErr := strconv.Atoi(tp) + if convErr != nil { + return cfg, fmt.Errorf("invalid THROUGHPUT: %w", convErr) + } + cfg.Throughput = n + } else { + cfg.Throughput = 10000 + } + + // Comma-separated preferred locations (optional) + if pl := os.Getenv("PREFERRED_LOCATIONS"); pl != "" { + // Simple split on comma; whitespace trimming omitted for brevity + cfg.PreferredLocations = splitAndTrim(pl, ',') + } + return cfg, nil +} + +func splitAndTrim(s string, sep rune) []string { + if s == "" { + return nil + } + out := []string{} + cur := "" + for _, r := range s { + if r == sep { + if cur != "" { + out = append(out, cur) + cur = "" + } + continue + } + cur += string(r) + } + if cur != "" { + out = append(out, cur) + } + return out +} From fe85531b9a0abf5fb5c49ef789f7694672202c8e Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Sun, 12 Oct 2025 22:51:38 -0700 Subject: [PATCH 2/6] create initial read write query workload --- sdk/data/azcosmos/workloads/initial_setup.go | 57 ++-------- sdk/data/azcosmos/workloads/main/main.go | 14 ++- sdk/data/azcosmos/workloads/r_w_q_workload.go | 44 ++++++++ sdk/data/azcosmos/workloads/workload_utils.go | 103 ++++++++++++++++++ 4 files changed, 170 insertions(+), 48 deletions(-) create mode 100644 sdk/data/azcosmos/workloads/r_w_q_workload.go create mode 100644 sdk/data/azcosmos/workloads/workload_utils.go diff --git a/sdk/data/azcosmos/workloads/initial_setup.go b/sdk/data/azcosmos/workloads/initial_setup.go index 0b4aa07263f7..bac494e09be6 100644 --- a/sdk/data/azcosmos/workloads/initial_setup.go +++ b/sdk/data/azcosmos/workloads/initial_setup.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + package workloads import ( @@ -6,35 +9,12 @@ import ( "errors" "fmt" "log" - "math/rand" "sync" - "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" ) -func createRandomItem(i int) map[string]interface{} { - return map[string]interface{}{ - "type": "testItem", - "createdAt": time.Now().UTC().Format(time.RFC3339Nano), - "seq": i, - "value": rand.Int63(), // pseudo-random payload - } -} - -func createClient(cfg workloadConfig) (*azcosmos.Client, error) { - cred, err := azcosmos.NewKeyCredential(cfg.Key) - if err != nil { - return nil, err - } - opts := &azcosmos.ClientOptions{ - PreferredRegions: cfg.PreferredLocations, - // Add EnableContentResponseOnWrite: true if you want full responses - } - return azcosmos.NewClientWithKey(cfg.Endpoint, cred, opts) -} - // createDatabaseIfNotExists attempts to create the database, tolerating conflicts. func createDatabaseIfNotExists(ctx context.Context, client *azcosmos.Client, dbID string) (*azcosmos.DatabaseClient, error) { dbClient, err := client.NewDatabase(dbID) @@ -55,7 +35,7 @@ func createDatabaseIfNotExists(ctx context.Context, client *azcosmos.Client, dbI return dbClient, nil } -func createContainerIfNotExists(ctx context.Context, db *azcosmos.DatabaseClient, containerID, pkField string) (*azcosmos.ContainerClient, error) { +func createContainerIfNotExists(ctx context.Context, db *azcosmos.DatabaseClient, containerID, pkField string, desiredThroughput int32) (*azcosmos.ContainerClient, error) { containerClient, err := db.NewContainer(containerID) if err != nil { return nil, err @@ -70,8 +50,11 @@ func createContainerIfNotExists(ctx context.Context, db *azcosmos.DatabaseClient }, } + throughput := azcosmos.NewManualThroughputProperties(desiredThroughput) // Try create - _, err = db.CreateContainer(ctx, props, nil) + _, err = db.CreateContainer(ctx, props, &azcosmos.CreateContainerOptions{ + ThroughputProperties: &throughput, + }) if err != nil { var azErr *azcore.ResponseError if errors.As(err, &azErr) { @@ -85,7 +68,6 @@ func createContainerIfNotExists(ctx context.Context, db *azcosmos.DatabaseClient return containerClient, nil } -// upsertItemsConcurrently performs count upserts concurrently. func upsertItemsConcurrently(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { // Use a bounded worker pool to avoid oversaturating resources workers := 32 @@ -149,14 +131,13 @@ sendLoop: return firstErr } -// RunWorkload creates the database/container and performs the concurrent upserts. -func RunWorkload(ctx context.Context) error { +// RunSetup creates the database/container and performs the concurrent upserts. +func RunSetup(ctx context.Context) error { cfg, err := loadConfig() if err != nil { return err } - println("Creating client...") client, err := createClient(cfg) if err != nil { return fmt.Errorf("creating client: %w", err) @@ -167,17 +148,13 @@ func RunWorkload(ctx context.Context) error { if err != nil { return fmt.Errorf("ensure database: %w", err) } - println("Creating container...") - container, err := createContainerIfNotExists(ctx, dbClient, cfg.ContainerID, cfg.PartitionKeyFieldName) + container, err := createContainerIfNotExists(ctx, dbClient, cfg.ContainerID, cfg.PartitionKeyFieldName, int32(cfg.Throughput)) if err != nil { return fmt.Errorf("ensure container: %w", err) } - // NUMBER_OF_LOGICAL_PARTITIONS + 1 - var count = cfg.LogicalPartitions + 1 - - println("Starting workload...") + var count = cfg.LogicalPartitions log.Printf("Starting %d concurrent upserts...", count) @@ -188,13 +165,3 @@ func RunWorkload(ctx context.Context) error { log.Printf("Completed %d upserts.", count) return nil } - -// main-style entry (optional if you want a standalone runnable). -// If you prefer a standalone executable place this in package main instead. -func Run() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) - defer cancel() - if err := RunWorkload(ctx); err != nil { - log.Fatalf("workload failed: %v", err) - } -} diff --git a/sdk/data/azcosmos/workloads/main/main.go b/sdk/data/azcosmos/workloads/main/main.go index 6b17c47957c4..faebbd3d9944 100644 --- a/sdk/data/azcosmos/workloads/main/main.go +++ b/sdk/data/azcosmos/workloads/main/main.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + package main import ( @@ -9,8 +12,13 @@ import ( ) func main() { - _, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() - workloads.Run() // or workloads.RunWorkload(ctx) if you make RunWorkload public - log.Println("done") + if err := workloads.RunSetup(ctx); err != nil { + log.Fatalf("setup failed: %v", err) + } + log.Println("setup completed") + if err := workloads.RunWorkload(ctx); err != nil { + log.Fatalf("workload failed: %v", err) + } } diff --git a/sdk/data/azcosmos/workloads/r_w_q_workload.go b/sdk/data/azcosmos/workloads/r_w_q_workload.go new file mode 100644 index 000000000000..b65e0185ff0e --- /dev/null +++ b/sdk/data/azcosmos/workloads/r_w_q_workload.go @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package workloads + +import ( + "context" + "fmt" + "log" +) + +func RunWorkload(ctx context.Context) error { + cfg, err := loadConfig() + if err != nil { + return err + } + + client, err := createClient(cfg) + if err != nil { + return fmt.Errorf("creating client: %w", err) + } + + dbClient, err := client.NewDatabase(cfg.DatabaseID) + if err != nil { + return fmt.Errorf("ensure database: %w", err) + } + + container, err := dbClient.NewContainer(cfg.ContainerID) + if err != nil { + return fmt.Errorf("ensure container: %w", err) + } + + var count = cfg.LogicalPartitions + + log.Printf("Starting %d concurrent upserts...", count) + + if err := randomUpserts(ctx, container, count, cfg.PartitionKeyFieldName); err != nil { + return fmt.Errorf("upserts failed: %w", err) + } + + log.Printf("Completed %d upserts.", count) + return nil + +} diff --git a/sdk/data/azcosmos/workloads/workload_utils.go b/sdk/data/azcosmos/workloads/workload_utils.go new file mode 100644 index 000000000000..96a699c79481 --- /dev/null +++ b/sdk/data/azcosmos/workloads/workload_utils.go @@ -0,0 +1,103 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package workloads + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" +) + +func createRandomItem(i int) map[string]interface{} { + return map[string]interface{}{ + "type": "testItem", + "createdAt": time.Now().UTC().Format(time.RFC3339Nano), + "seq": i, + "value": rand.Int63(), // pseudo-random payload + } +} + +func randomUpserts(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { + // Use a bounded worker pool to avoid oversaturating resources + workers := 32 + if count < workers { + workers = count + } + type job struct { + i int + } + jobs := make(chan job, workers) + errs := make(chan error, count) + wg := &sync.WaitGroup{} + + for w := 0; w < workers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := range jobs { + + var rng = rand.New(rand.NewSource(time.Now().UnixNano())) + // re-upsert a document already written + var num = rng.Intn(count) + 1 + item := createRandomItem(j.i) + item["id"] = fmt.Sprintf("test-%d", num) + item[pkField] = fmt.Sprintf("pk-%d", num) + + // Marshal item to bytes; UpsertItem often takes []byte + partition key value + body, err := json.Marshal(item) + if err != nil { + errs <- err + continue + } + + pk := azcosmos.NewPartitionKeyString(item[pkField].(string)) + _, err = container.UpsertItem(ctx, pk, body, nil) + if err != nil { + errs <- err + continue + } + println("writing an item") + } + }() + } + +sendLoop: + for i := 0; i < count; i++ { + select { + case <-ctx.Done(): + break sendLoop + default: + jobs <- job{i: i} + } + } + close(jobs) + wg.Wait() + close(errs) + + // Aggregate errors if any + var firstErr error + for e := range errs { + if firstErr == nil { + firstErr = e + } + } + return firstErr +} + +func createClient(cfg workloadConfig) (*azcosmos.Client, error) { + cred, err := azcosmos.NewKeyCredential(cfg.Key) + if err != nil { + return nil, err + } + opts := &azcosmos.ClientOptions{ + PreferredRegions: cfg.PreferredLocations, + // Add EnableContentResponseOnWrite: true if you want full responses + } + return azcosmos.NewClientWithKey(cfg.Endpoint, cred, opts) +} From 0b463cb5f1851688a3a166b9dddafceddb6f49dc Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Sun, 12 Oct 2025 22:52:29 -0700 Subject: [PATCH 3/6] remove unnecessary print statement --- sdk/data/azcosmos/workloads/initial_setup.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/data/azcosmos/workloads/initial_setup.go b/sdk/data/azcosmos/workloads/initial_setup.go index bac494e09be6..8ff650b89b62 100644 --- a/sdk/data/azcosmos/workloads/initial_setup.go +++ b/sdk/data/azcosmos/workloads/initial_setup.go @@ -103,7 +103,6 @@ func upsertItemsConcurrently(ctx context.Context, container *azcosmos.ContainerC errs <- err continue } - println("writing an item") } }() } From 54b06bee285e33b04d7b418e8b05c05d5d501bfb Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Mon, 13 Oct 2025 12:56:42 -0700 Subject: [PATCH 4/6] add changes to perform reads,writes,queries --- sdk/data/azcosmos/workloads/r_w_q_workload.go | 11 ++-- sdk/data/azcosmos/workloads/workload_utils.go | 55 ++++++++++++++----- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/sdk/data/azcosmos/workloads/r_w_q_workload.go b/sdk/data/azcosmos/workloads/r_w_q_workload.go index b65e0185ff0e..12a72279d157 100644 --- a/sdk/data/azcosmos/workloads/r_w_q_workload.go +++ b/sdk/data/azcosmos/workloads/r_w_q_workload.go @@ -32,13 +32,12 @@ func RunWorkload(ctx context.Context) error { var count = cfg.LogicalPartitions - log.Printf("Starting %d concurrent upserts...", count) + log.Printf("Starting %d concurrent read/write/queries ...", count) - if err := randomUpserts(ctx, container, count, cfg.PartitionKeyFieldName); err != nil { - return fmt.Errorf("upserts failed: %w", err) + for { + if err := randomReadWriteQueries(ctx, container, count, cfg.PartitionKeyFieldName); err != nil { + return fmt.Errorf("read/write queries failed: %w", err) + } } - log.Printf("Completed %d upserts.", count) - return nil - } diff --git a/sdk/data/azcosmos/workloads/workload_utils.go b/sdk/data/azcosmos/workloads/workload_utils.go index 96a699c79481..95851296c9ee 100644 --- a/sdk/data/azcosmos/workloads/workload_utils.go +++ b/sdk/data/azcosmos/workloads/workload_utils.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "log" "math/rand" "sync" "time" @@ -23,7 +24,7 @@ func createRandomItem(i int) map[string]interface{} { } } -func randomUpserts(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { +func randomReadWriteQueries(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { // Use a bounded worker pool to avoid oversaturating resources workers := 32 if count < workers { @@ -41,28 +42,54 @@ func randomUpserts(ctx context.Context, container *azcosmos.ContainerClient, cou go func() { defer wg.Done() for j := range jobs { - - var rng = rand.New(rand.NewSource(time.Now().UnixNano())) - // re-upsert a document already written - var num = rng.Intn(count) + 1 + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + // re-upsert/read/query a document (some may not exist yet which can surface 404s) + num := rng.Intn(count) + 1 item := createRandomItem(j.i) - item["id"] = fmt.Sprintf("test-%d", num) - item[pkField] = fmt.Sprintf("pk-%d", num) + id := fmt.Sprintf("test-%d", num) + pkVal := fmt.Sprintf("pk-%d", num) + item["id"] = id + item[pkField] = pkVal - // Marshal item to bytes; UpsertItem often takes []byte + partition key value body, err := json.Marshal(item) if err != nil { + log.Printf("randomRW marshal error id=%s pk=%s: %v", id, pkVal, err) errs <- err continue } - pk := azcosmos.NewPartitionKeyString(item[pkField].(string)) - _, err = container.UpsertItem(ctx, pk, body, nil) - if err != nil { - errs <- err - continue + pk := azcosmos.NewPartitionKeyString(pkVal) + // Include query op (0=upsert,1=read,2=query) + operationNum := rng.Intn(3) + switch operationNum { + case 0: // Upsert + if _, err = container.UpsertItem(ctx, pk, body, nil); err != nil { + log.Printf("upsert error id=%s pk=%s: %v", id, pkVal, err) + errs <- err + continue + } + case 1: // Read + if _, err = container.ReadItem(ctx, pk, id, nil); err != nil { + log.Printf("read error id=%s pk=%s: %v", id, pkVal, err) + errs <- err + continue + } + case 2: // Query by id + pager := container.NewQueryItemsPager( + "SELECT * FROM c WHERE c.id = @id", + azcosmos.NewPartitionKeyString(pkVal), + &azcosmos.QueryOptions{ + QueryParameters: []azcosmos.QueryParameter{{Name: "@id", Value: id}}, + }, + ) + for pager.More() { + if _, err = pager.NextPage(ctx); err != nil { + log.Printf("query error id=%s pk=%s: %v", id, pkVal, err) + errs <- err + break + } + } } - println("writing an item") } }() } From 6ceccc425dfe86cedb15aa9fff6d1fc543ba0fb7 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Mon, 13 Oct 2025 13:57:57 -0700 Subject: [PATCH 5/6] increase time workload runs --- sdk/data/azcosmos/workloads/main/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/data/azcosmos/workloads/main/main.go b/sdk/data/azcosmos/workloads/main/main.go index faebbd3d9944..a5f3a49254e8 100644 --- a/sdk/data/azcosmos/workloads/main/main.go +++ b/sdk/data/azcosmos/workloads/main/main.go @@ -12,7 +12,8 @@ import ( ) func main() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + // max run the workload for 12 hours + ctx, cancel := context.WithTimeout(context.Background(), 12*60*time.Minute) defer cancel() if err := workloads.RunSetup(ctx); err != nil { log.Fatalf("setup failed: %v", err) From 80da2245ed4a88d0d9e65cb18384b3b59a4fb13e Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Mon, 13 Oct 2025 14:26:59 -0700 Subject: [PATCH 6/6] refactor and react to comments --- sdk/data/azcosmos/workloads/initial_setup.go | 64 -------- .../azcosmos/workloads/workload_config.go | 7 +- sdk/data/azcosmos/workloads/workload_utils.go | 150 +++++++++++------- 3 files changed, 101 insertions(+), 120 deletions(-) diff --git a/sdk/data/azcosmos/workloads/initial_setup.go b/sdk/data/azcosmos/workloads/initial_setup.go index 8ff650b89b62..291bf378fc04 100644 --- a/sdk/data/azcosmos/workloads/initial_setup.go +++ b/sdk/data/azcosmos/workloads/initial_setup.go @@ -5,11 +5,9 @@ package workloads import ( "context" - "encoding/json" "errors" "fmt" "log" - "sync" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" @@ -68,68 +66,6 @@ func createContainerIfNotExists(ctx context.Context, db *azcosmos.DatabaseClient return containerClient, nil } -func upsertItemsConcurrently(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { - // Use a bounded worker pool to avoid oversaturating resources - workers := 32 - if count < workers { - workers = count - } - type job struct { - i int - } - jobs := make(chan job, workers) - errs := make(chan error, count) - wg := &sync.WaitGroup{} - - for w := 0; w < workers; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := range jobs { - item := createRandomItem(j.i) - item["id"] = fmt.Sprintf("test-%d", j.i) - item[pkField] = fmt.Sprintf("pk-%d", j.i) - - // Marshal item to bytes; UpsertItem often takes []byte + partition key value - body, err := json.Marshal(item) - if err != nil { - errs <- err - continue - } - - pk := azcosmos.NewPartitionKeyString(item[pkField].(string)) - _, err = container.UpsertItem(ctx, pk, body, nil) - if err != nil { - errs <- err - continue - } - } - }() - } - -sendLoop: - for i := 0; i < count; i++ { - select { - case <-ctx.Done(): - break sendLoop - default: - jobs <- job{i: i} - } - } - close(jobs) - wg.Wait() - close(errs) - - // Aggregate errors if any - var firstErr error - for e := range errs { - if firstErr == nil { - firstErr = e - } - } - return firstErr -} - // RunSetup creates the database/container and performs the concurrent upserts. func RunSetup(ctx context.Context) error { cfg, err := loadConfig() diff --git a/sdk/data/azcosmos/workloads/workload_config.go b/sdk/data/azcosmos/workloads/workload_config.go index da5141221dc6..b4ef0502504d 100644 --- a/sdk/data/azcosmos/workloads/workload_config.go +++ b/sdk/data/azcosmos/workloads/workload_config.go @@ -18,6 +18,9 @@ type workloadConfig struct { Throughput int // optional (unused if not supported) } +const defaultLogicalPartitions = 10000 +const defaultThroughput = 100000 + func loadConfig() (workloadConfig, error) { get := func(name string) (string, error) { v := os.Getenv(name) @@ -55,7 +58,7 @@ func loadConfig() (workloadConfig, error) { } cfg.LogicalPartitions = n } else { - cfg.LogicalPartitions = 10000 + cfg.LogicalPartitions = defaultLogicalPartitions } if tp := os.Getenv("THROUGHPUT"); tp != "" { @@ -65,7 +68,7 @@ func loadConfig() (workloadConfig, error) { } cfg.Throughput = n } else { - cfg.Throughput = 10000 + cfg.Throughput = defaultThroughput } // Comma-separated preferred locations (optional) diff --git a/sdk/data/azcosmos/workloads/workload_utils.go b/sdk/data/azcosmos/workloads/workload_utils.go index 95851296c9ee..104f5d1bc144 100644 --- a/sdk/data/azcosmos/workloads/workload_utils.go +++ b/sdk/data/azcosmos/workloads/workload_utils.go @@ -6,6 +6,7 @@ package workloads import ( "context" "encoding/json" + "errors" "fmt" "log" "math/rand" @@ -15,6 +16,18 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" ) +const defaultConcurrency = 32 + +// rwOperation enumerates random read/write/query operations. +type rwOperation int + +const ( + opUpsert rwOperation = iota + opRead + opQuery + opOpCount // sentinel for number of operations +) + func createRandomItem(i int) map[string]interface{} { return map[string]interface{}{ "type": "testItem", @@ -24,74 +37,42 @@ func createRandomItem(i int) map[string]interface{} { } } -func randomReadWriteQueries(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { - // Use a bounded worker pool to avoid oversaturating resources - workers := 32 +// runConcurrent executes count indexed jobs across at most workers goroutines. +// jf should be idempotent per index; it receives a per-worker RNG (not safe to share across workers). +func runConcurrent(ctx context.Context, count, workers int, jf func(ctx context.Context, index int, rng *rand.Rand) error) error { + if count <= 0 { + return errors.New("count must be > 0") + } + if workers <= 0 { + workers = 1 + } if count < workers { workers = count } - type job struct { - i int - } + + type job struct{ i int } jobs := make(chan job, workers) errs := make(chan error, count) wg := &sync.WaitGroup{} for w := 0; w < workers; w++ { wg.Add(1) - go func() { + go func(workerID int) { defer wg.Done() + // Seed rng per worker (unique-ish seed) + rng := rand.New(rand.NewSource(time.Now().UnixNano() + int64(workerID)<<32)) for j := range jobs { - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - // re-upsert/read/query a document (some may not exist yet which can surface 404s) - num := rng.Intn(count) + 1 - item := createRandomItem(j.i) - id := fmt.Sprintf("test-%d", num) - pkVal := fmt.Sprintf("pk-%d", num) - item["id"] = id - item[pkField] = pkVal - - body, err := json.Marshal(item) - if err != nil { - log.Printf("randomRW marshal error id=%s pk=%s: %v", id, pkVal, err) - errs <- err - continue + if ctx.Err() != nil { + return } - - pk := azcosmos.NewPartitionKeyString(pkVal) - // Include query op (0=upsert,1=read,2=query) - operationNum := rng.Intn(3) - switch operationNum { - case 0: // Upsert - if _, err = container.UpsertItem(ctx, pk, body, nil); err != nil { - log.Printf("upsert error id=%s pk=%s: %v", id, pkVal, err) - errs <- err - continue - } - case 1: // Read - if _, err = container.ReadItem(ctx, pk, id, nil); err != nil { - log.Printf("read error id=%s pk=%s: %v", id, pkVal, err) - errs <- err - continue - } - case 2: // Query by id - pager := container.NewQueryItemsPager( - "SELECT * FROM c WHERE c.id = @id", - azcosmos.NewPartitionKeyString(pkVal), - &azcosmos.QueryOptions{ - QueryParameters: []azcosmos.QueryParameter{{Name: "@id", Value: id}}, - }, - ) - for pager.More() { - if _, err = pager.NextPage(ctx); err != nil { - log.Printf("query error id=%s pk=%s: %v", id, pkVal, err) - errs <- err - break - } + if err := jf(ctx, j.i, rng); err != nil { + select { + case errs <- err: + default: // channel full; drop to avoid blocking } } } - }() + }(w) } sendLoop: @@ -107,7 +88,6 @@ sendLoop: wg.Wait() close(errs) - // Aggregate errors if any var firstErr error for e := range errs { if firstErr == nil { @@ -117,6 +97,68 @@ sendLoop: return firstErr } +func upsertItemsConcurrently(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { + return runConcurrent(ctx, count, defaultConcurrency, func(ctx context.Context, i int, rng *rand.Rand) error { + item := createRandomItem(i) + id := fmt.Sprintf("test-%d", i) + pkVal := fmt.Sprintf("pk-%d", i) + item["id"] = id + item[pkField] = pkVal + body, err := json.Marshal(item) + if err != nil { + return err + } + pk := azcosmos.NewPartitionKeyString(pkVal) + _, err = container.UpsertItem(ctx, pk, body, nil) + return err + }) +} + +func randomReadWriteQueries(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { + return runConcurrent(ctx, count, defaultConcurrency, func(ctx context.Context, i int, rng *rand.Rand) error { + // pick a random existing (or future) document index to operate on + num := rng.Intn(count) + 1 + id := fmt.Sprintf("test-%d", num) + pkVal := fmt.Sprintf("pk-%d", num) + pk := azcosmos.NewPartitionKeyString(pkVal) + + op := rwOperation(rng.Intn(int(opOpCount))) + switch op { + case opUpsert: + item := createRandomItem(i) + item["id"] = id + item[pkField] = pkVal + body, err := json.Marshal(item) + if err != nil { + log.Printf("randomRW marshal error id=%s pk=%s: %v", id, pkVal, err) + return err + } + if _, err := container.UpsertItem(ctx, pk, body, nil); err != nil { + log.Printf("upsert error id=%s pk=%s: %v", id, pkVal, err) + return err + } + case opRead: + if _, err := container.ReadItem(ctx, pk, id, nil); err != nil { + log.Printf("read error id=%s pk=%s: %v", id, pkVal, err) + return err + } + case opQuery: + pager := container.NewQueryItemsPager( + "SELECT * FROM c WHERE c.id = @id", + azcosmos.NewPartitionKeyString(pkVal), + &azcosmos.QueryOptions{QueryParameters: []azcosmos.QueryParameter{{Name: "@id", Value: id}}}, + ) + for pager.More() { + if _, err := pager.NextPage(ctx); err != nil { + log.Printf("query error id=%s pk=%s: %v", id, pkVal, err) + return err + } + } + } + return nil + }) +} + func createClient(cfg workloadConfig) (*azcosmos.Client, error) { cred, err := azcosmos.NewKeyCredential(cfg.Key) if err != nil {