diff --git a/sdk/data/azcosmos/workloads/dev.md b/sdk/data/azcosmos/workloads/dev.md new file mode 100644 index 000000000000..353f90484755 --- /dev/null +++ b/sdk/data/azcosmos/workloads/dev.md @@ -0,0 +1,32 @@ +## SDK Scale Testing +This directory contains the scale testing workloads for the SDK. The workloads are designed to test the performance +and scalability of the SDK under various conditions. + +### Setup Scale Testing +1. Create a VM in Azure with the following configuration: + - 8 vCPUs + - 32 GB RAM + - Ubuntu + - Accelerated networking +1. Give the VM necessary [permissions](https://learn.microsoft.com/azure/cosmos-db/nosql/how-to-grant-data-plane-access?tabs=built-in-definition%2Ccsharp&pivots=azure-interface-cli) to access the Cosmos DB account if using AAD (Optional). +1. Fork and clone this repository +1. Go to azcosmos folder + - `cd azure-sdk-for-go/sdk/data/azcosmos` +1. Checkout the branch with the changes to test. +1. Go to workloads folder + - `cd workloads` +1. Fill out relevant configs in `workload_configs.go`: key, host, etc using env variables +1. Run the setup workload to create the database and containers and insert test data + - `python3 initial-setup.py` +1. Run the scale workload + - `go run ./main/main.go` + +### Monitor Run +- `ps -eaf | grep "go"` to see the running processes +- `tail -f ` to see the logs in real time + +### Close Workloads +- If you want to keep the logs and stop the scripts, + `./shutdown_workloads.sh --do-not-remove-logs` +- If you want to remove the logs and stop the scripts, + `./shutdown_workloads.sh` diff --git a/sdk/data/azcosmos/workloads/initial_setup.go b/sdk/data/azcosmos/workloads/initial_setup.go new file mode 100644 index 000000000000..291bf378fc04 --- /dev/null +++ b/sdk/data/azcosmos/workloads/initial_setup.go @@ -0,0 +1,102 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package workloads + +import ( + "context" + "errors" + "fmt" + "log" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" +) + +// createDatabaseIfNotExists attempts to create the database, tolerating conflicts. +func createDatabaseIfNotExists(ctx context.Context, client *azcosmos.Client, dbID string) (*azcosmos.DatabaseClient, error) { + dbClient, err := client.NewDatabase(dbID) + if err != nil { + return nil, err + } + props := azcosmos.DatabaseProperties{ID: dbID} + _, err = client.CreateDatabase(ctx, props, nil) + if err != nil { + var azErr *azcore.ResponseError + if errors.As(err, &azErr) { + if azErr.StatusCode == 409 { + return dbClient, nil // already exists + } + } + return nil, err + } + return dbClient, nil +} + +func createContainerIfNotExists(ctx context.Context, db *azcosmos.DatabaseClient, containerID, pkField string, desiredThroughput int32) (*azcosmos.ContainerClient, error) { + containerClient, err := db.NewContainer(containerID) + if err != nil { + return nil, err + } + + // Build container properties + props := azcosmos.ContainerProperties{ + ID: containerID, + PartitionKeyDefinition: azcosmos.PartitionKeyDefinition{ + Paths: []string{"/" + pkField}, + Kind: azcosmos.PartitionKeyKindHash, + }, + } + + throughput := azcosmos.NewManualThroughputProperties(desiredThroughput) + // Try create + _, err = db.CreateContainer(ctx, props, &azcosmos.CreateContainerOptions{ + ThroughputProperties: &throughput, + }) + if err != nil { + var azErr *azcore.ResponseError + if errors.As(err, &azErr) { + if azErr.StatusCode == 409 { + return containerClient, nil // already exists + } + } + return nil, err + } + + return containerClient, nil +} + +// RunSetup creates the database/container and performs the concurrent upserts. +func RunSetup(ctx context.Context) error { + cfg, err := loadConfig() + if err != nil { + return err + } + + client, err := createClient(cfg) + if err != nil { + return fmt.Errorf("creating client: %w", err) + } + + println("Creating database...") + dbClient, err := createDatabaseIfNotExists(ctx, client, cfg.DatabaseID) + if err != nil { + return fmt.Errorf("ensure database: %w", err) + } + + container, err := createContainerIfNotExists(ctx, dbClient, cfg.ContainerID, cfg.PartitionKeyFieldName, int32(cfg.Throughput)) + if err != nil { + return fmt.Errorf("ensure container: %w", err) + } + + var count = cfg.LogicalPartitions + + log.Printf("Starting %d concurrent upserts...", count) + + if err := upsertItemsConcurrently(ctx, container, count, cfg.PartitionKeyFieldName); err != nil { + return fmt.Errorf("upserts failed: %w", err) + } + + log.Printf("Completed %d upserts.", count) + return nil +} diff --git a/sdk/data/azcosmos/workloads/main/main.go b/sdk/data/azcosmos/workloads/main/main.go new file mode 100644 index 000000000000..a5f3a49254e8 --- /dev/null +++ b/sdk/data/azcosmos/workloads/main/main.go @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package main + +import ( + "context" + "log" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos/workloads" +) + +func main() { + // max run the workload for 12 hours + ctx, cancel := context.WithTimeout(context.Background(), 12*60*time.Minute) + defer cancel() + if err := workloads.RunSetup(ctx); err != nil { + log.Fatalf("setup failed: %v", err) + } + log.Println("setup completed") + if err := workloads.RunWorkload(ctx); err != nil { + log.Fatalf("workload failed: %v", err) + } +} diff --git a/sdk/data/azcosmos/workloads/r_w_q_workload.go b/sdk/data/azcosmos/workloads/r_w_q_workload.go new file mode 100644 index 000000000000..12a72279d157 --- /dev/null +++ b/sdk/data/azcosmos/workloads/r_w_q_workload.go @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package workloads + +import ( + "context" + "fmt" + "log" +) + +func RunWorkload(ctx context.Context) error { + cfg, err := loadConfig() + if err != nil { + return err + } + + client, err := createClient(cfg) + if err != nil { + return fmt.Errorf("creating client: %w", err) + } + + dbClient, err := client.NewDatabase(cfg.DatabaseID) + if err != nil { + return fmt.Errorf("ensure database: %w", err) + } + + container, err := dbClient.NewContainer(cfg.ContainerID) + if err != nil { + return fmt.Errorf("ensure container: %w", err) + } + + var count = cfg.LogicalPartitions + + log.Printf("Starting %d concurrent read/write/queries ...", count) + + for { + if err := randomReadWriteQueries(ctx, container, count, cfg.PartitionKeyFieldName); err != nil { + return fmt.Errorf("read/write queries failed: %w", err) + } + } + +} diff --git a/sdk/data/azcosmos/workloads/workload_config.go b/sdk/data/azcosmos/workloads/workload_config.go new file mode 100644 index 000000000000..b4ef0502504d --- /dev/null +++ b/sdk/data/azcosmos/workloads/workload_config.go @@ -0,0 +1,102 @@ +package workloads + +import ( + "fmt" + "os" + "strconv" +) + +// Configuration loaded from environment (mirrors the Python version) +type workloadConfig struct { + Endpoint string + Key string + PreferredLocations []string + DatabaseID string + ContainerID string + PartitionKeyFieldName string + LogicalPartitions int + Throughput int // optional (unused if not supported) +} + +const defaultLogicalPartitions = 10000 +const defaultThroughput = 100000 + +func loadConfig() (workloadConfig, error) { + get := func(name string) (string, error) { + v := os.Getenv(name) + if v == "" { + return "", fmt.Errorf("missing env var %s", name) + } + return v, nil + } + + var cfg workloadConfig + var err error + + if cfg.Endpoint, err = get("COSMOS_URI"); err != nil { + return cfg, err + } + if cfg.Key, err = get("COSMOS_KEY"); err != nil { + return cfg, err + } + if cfg.DatabaseID, err = get("COSMOS_DATABASE"); err != nil { + return cfg, err + } + if cfg.ContainerID, err = get("COSMOS_CONTAINER"); err != nil { + return cfg, err + } + if pk := os.Getenv("PARTITION_KEY"); pk != "" { + cfg.PartitionKeyFieldName = pk + } else { + cfg.PartitionKeyFieldName = "pk" + } + + if lp := os.Getenv("NUMBER_OF_LOGICAL_PARTITIONS"); lp != "" { + n, convErr := strconv.Atoi(lp) + if convErr != nil { + return cfg, fmt.Errorf("invalid NUMBER_OF_LOGICAL_PARTITIONS: %w", convErr) + } + cfg.LogicalPartitions = n + } else { + cfg.LogicalPartitions = defaultLogicalPartitions + } + + if tp := os.Getenv("THROUGHPUT"); tp != "" { + n, convErr := strconv.Atoi(tp) + if convErr != nil { + return cfg, fmt.Errorf("invalid THROUGHPUT: %w", convErr) + } + cfg.Throughput = n + } else { + cfg.Throughput = defaultThroughput + } + + // Comma-separated preferred locations (optional) + if pl := os.Getenv("PREFERRED_LOCATIONS"); pl != "" { + // Simple split on comma; whitespace trimming omitted for brevity + cfg.PreferredLocations = splitAndTrim(pl, ',') + } + return cfg, nil +} + +func splitAndTrim(s string, sep rune) []string { + if s == "" { + return nil + } + out := []string{} + cur := "" + for _, r := range s { + if r == sep { + if cur != "" { + out = append(out, cur) + cur = "" + } + continue + } + cur += string(r) + } + if cur != "" { + out = append(out, cur) + } + return out +} diff --git a/sdk/data/azcosmos/workloads/workload_utils.go b/sdk/data/azcosmos/workloads/workload_utils.go new file mode 100644 index 000000000000..104f5d1bc144 --- /dev/null +++ b/sdk/data/azcosmos/workloads/workload_utils.go @@ -0,0 +1,172 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package workloads + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "math/rand" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" +) + +const defaultConcurrency = 32 + +// rwOperation enumerates random read/write/query operations. +type rwOperation int + +const ( + opUpsert rwOperation = iota + opRead + opQuery + opOpCount // sentinel for number of operations +) + +func createRandomItem(i int) map[string]interface{} { + return map[string]interface{}{ + "type": "testItem", + "createdAt": time.Now().UTC().Format(time.RFC3339Nano), + "seq": i, + "value": rand.Int63(), // pseudo-random payload + } +} + +// runConcurrent executes count indexed jobs across at most workers goroutines. +// jf should be idempotent per index; it receives a per-worker RNG (not safe to share across workers). +func runConcurrent(ctx context.Context, count, workers int, jf func(ctx context.Context, index int, rng *rand.Rand) error) error { + if count <= 0 { + return errors.New("count must be > 0") + } + if workers <= 0 { + workers = 1 + } + if count < workers { + workers = count + } + + type job struct{ i int } + jobs := make(chan job, workers) + errs := make(chan error, count) + wg := &sync.WaitGroup{} + + for w := 0; w < workers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + // Seed rng per worker (unique-ish seed) + rng := rand.New(rand.NewSource(time.Now().UnixNano() + int64(workerID)<<32)) + for j := range jobs { + if ctx.Err() != nil { + return + } + if err := jf(ctx, j.i, rng); err != nil { + select { + case errs <- err: + default: // channel full; drop to avoid blocking + } + } + } + }(w) + } + +sendLoop: + for i := 0; i < count; i++ { + select { + case <-ctx.Done(): + break sendLoop + default: + jobs <- job{i: i} + } + } + close(jobs) + wg.Wait() + close(errs) + + var firstErr error + for e := range errs { + if firstErr == nil { + firstErr = e + } + } + return firstErr +} + +func upsertItemsConcurrently(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { + return runConcurrent(ctx, count, defaultConcurrency, func(ctx context.Context, i int, rng *rand.Rand) error { + item := createRandomItem(i) + id := fmt.Sprintf("test-%d", i) + pkVal := fmt.Sprintf("pk-%d", i) + item["id"] = id + item[pkField] = pkVal + body, err := json.Marshal(item) + if err != nil { + return err + } + pk := azcosmos.NewPartitionKeyString(pkVal) + _, err = container.UpsertItem(ctx, pk, body, nil) + return err + }) +} + +func randomReadWriteQueries(ctx context.Context, container *azcosmos.ContainerClient, count int, pkField string) error { + return runConcurrent(ctx, count, defaultConcurrency, func(ctx context.Context, i int, rng *rand.Rand) error { + // pick a random existing (or future) document index to operate on + num := rng.Intn(count) + 1 + id := fmt.Sprintf("test-%d", num) + pkVal := fmt.Sprintf("pk-%d", num) + pk := azcosmos.NewPartitionKeyString(pkVal) + + op := rwOperation(rng.Intn(int(opOpCount))) + switch op { + case opUpsert: + item := createRandomItem(i) + item["id"] = id + item[pkField] = pkVal + body, err := json.Marshal(item) + if err != nil { + log.Printf("randomRW marshal error id=%s pk=%s: %v", id, pkVal, err) + return err + } + if _, err := container.UpsertItem(ctx, pk, body, nil); err != nil { + log.Printf("upsert error id=%s pk=%s: %v", id, pkVal, err) + return err + } + case opRead: + if _, err := container.ReadItem(ctx, pk, id, nil); err != nil { + log.Printf("read error id=%s pk=%s: %v", id, pkVal, err) + return err + } + case opQuery: + pager := container.NewQueryItemsPager( + "SELECT * FROM c WHERE c.id = @id", + azcosmos.NewPartitionKeyString(pkVal), + &azcosmos.QueryOptions{QueryParameters: []azcosmos.QueryParameter{{Name: "@id", Value: id}}}, + ) + for pager.More() { + if _, err := pager.NextPage(ctx); err != nil { + log.Printf("query error id=%s pk=%s: %v", id, pkVal, err) + return err + } + } + } + return nil + }) +} + +func createClient(cfg workloadConfig) (*azcosmos.Client, error) { + cred, err := azcosmos.NewKeyCredential(cfg.Key) + if err != nil { + return nil, err + } + opts := &azcosmos.ClientOptions{ + PreferredRegions: cfg.PreferredLocations, + // Add EnableContentResponseOnWrite: true if you want full responses + } + return azcosmos.NewClientWithKey(cfg.Endpoint, cred, opts) +}