Skip to content

Uneven pipeline distribution across indexers #5833

@earlbread

Description

@earlbread

Hello,

Is there a way to evenly distribute indexing pipelines across all Indexers?

We are currently using 16 Indexers(K8s pods), each with 8 CPU cores and 32GB of memory. The index configuration sets num_pipelines to 32, and the Kafka topic also has 32 partitions. However, it appears that only a few Indexers are assigned the pipelines, while the rest remain underutilized. It seems that the only way to increase throughput is to add more CPU.

Is there a method to ensure that the pipelines are evenly distributed across all Indexers?

We are currently using the qw-airmail-20250522-hotfix version.

Current resource usage:
Image

Indexing Throughput:
Image

Below is the current index configuration:

Source

[
  {
    "version": "0.9",
    "source_id": "kafka_to_access_log_v2_quickwit",
    "num_pipelines": 32,
    "enabled": true,
    "source_type": "kafka",
    "params": {
      "topic": "log.common.access_log_v2_quickwit",
      "client_params": {
        "bootstrap.servers": "b-1.loggingkafka.euq7fc.c3.kafka.ap-northeast-2.amazonaws.com,b-2.loggingkafka.euq7fc.c3.kafka.ap-northeast-2.amazonaws.com,b-3.loggingkafka.euq7fc.c3.kafka.ap-northeast-2.amazonaws.com",
        "group.id": "log.quickwit.access_log_v2"
      }
    },
    "transform": {
      "script": "# `occurredAt` is the timestamp of the log, but `time` is the timestamp of the log entry.\n# So we use `occurred_at` as `time` if it exists.\nif exists(.data.accessLogV2.occurredAt) {\n  .time = .data.accessLogV2.occurredAt\n}\n# unwrapped data fields\nunwrappedDataFields = merge(., {\n  \"requestId\": .data.accessLogV2.requestId,\n  \"ip\": .data.accessLogV2.ip,\n  \"sessionId\": .data.accessLogV2.sessionId,\n  \"userAgent\": .data.accessLogV2.userAgent,\n  \"xUserAgent\": .data.accessLogV2.xUserAgent,\n  \"userId\": .data.accessLogV2.userId,\n  \"deviceId\": .data.accessLogV2.deviceId,\n  \"latency\": .data.accessLogV2.latency,\n  \"latencyNs\": .data.accessLogV2.latencyNs,\n  \"occurredAt\": .data.accessLogV2.occurredAt,\n  \"httpStatusCode\": .data.accessLogV2.httpStatusCode,\n  \"httpMethod\": .data.accessLogV2.httpMethod,\n  \"httpPath\": .data.accessLogV2.httpPath,\n  \"grpcStatusCode\": .data.accessLogV2.grpcStatusCode,\n  \"grpcMethod\": .data.accessLogV2.grpcMethod,\n  \"extra\": .data.accessLogV2.extra,\n  \"kafkaConsumerGroupId\": .data.accessLogV2.kafkaConsumerGroupId,\n  \"kafkaConsumerClientId\": .data.accessLogV2.kafkaConsumerClientId,\n  \"kafkaConsumerHostName\": .data.accessLogV2.kafkaConsumerHostName,\n  \"kafkaTopic\": .data.accessLogV2.kafkaTopic,\n  \"kafkaPartition\": .data.accessLogV2.kafkaPartition,\n  \"kafkaOffset\": .data.accessLogV2.kafkaOffset,\n  \"kafkaMessageKey\": .data.accessLogV2.kafkaMessageKey,\n  \"kafkaConsumingResult\":.data.accessLogV2.kafkaConsumingResult\n})\n\n# remove unused fields\ndel(unwrappedDataFields.data)\ndel(unwrappedDataFields.specversion)\ndel(unwrappedDataFields.type)\n\n. = unwrappedDataFields\n",
      "timezone": "UTC"
    },
    "input_format": "json"
  },
  {
    "version": "0.9",
    "source_id": "_ingest-api-source",
    "num_pipelines": 1,
    "enabled": true,
    "source_type": "ingest-api",
    "input_format": "json"
  },
  {
    "version": "0.9",
    "source_id": "_ingest-source",
    "num_pipelines": 1,
    "enabled": true,
    "source_type": "ingest",
    "input_format": "json"
  },
  {
    "version": "0.9",
    "source_id": "_ingest-cli-source",
    "num_pipelines": 1,
    "enabled": true,
    "source_type": "ingest-cli",
    "input_format": "json"
  }
]

Doc mapping

{
  "doc_mapping_uid": "00000000000000000000000000",
  "mode": "dynamic",
  "dynamic_mapping": {
    "indexed": true,
    "tokenizer": "raw",
    "record": "basic",
    "stored": true,
    "expand_dots": true,
    "fast": {
      "normalizer": "raw"
    }
  },
  "field_mappings": [
    {
      "name": "id",
      "type": "text",
      "description": "unique identifier for the event",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "specversion",
      "type": "text",
      "description": "version information about the CloudEvents specification",
      "fast": false,
      "fieldnorms": false,
      "indexed": false,
      "stored": false
    },
    {
      "name": "source",
      "type": "text",
      "description": "information about where the event occurred",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "subject",
      "type": "text",
      "description": "detailed information about the source where the event occurred",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "time",
      "type": "datetime",
      "description": "timestamp of the event",
      "fast": true,
      "fast_precision": "seconds",
      "indexed": true,
      "input_formats": [
        "unix_timestamp",
        "iso8601"
      ],
      "output_format": "unix_timestamp_nanos",
      "stored": true
    },
    {
      "name": "datacontenttype",
      "type": "text",
      "description": "content type of the data",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "requestId",
      "type": "text",
      "description": "request id",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "ip",
      "type": "ip",
      "description": "ip address",
      "fast": true,
      "indexed": true,
      "stored": true
    },
    {
      "name": "userAgent",
      "type": "text",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "xUserAgent",
      "type": "text",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "userId",
      "type": "text",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "deviceId",
      "type": "text",
      "description": "device's id eg) 6ae1f6a6-107d-3183-a1df-adf7368f9d10",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "latency",
      "type": "text",
      "description": "latency of the request, but already in latencyNs, so we don't need to index it",
      "fast": false,
      "fieldnorms": false,
      "indexed": false,
      "stored": false
    },
    {
      "name": "latencyNs",
      "type": "i64",
      "coerce": true,
      "description": "latency of the request in nanoseconds",
      "fast": true,
      "indexed": true,
      "output_format": "number",
      "stored": true
    },
    {
      "name": "occurredAt",
      "type": "datetime",
      "description": "timestamp of the log entry",
      "fast": false,
      "fast_precision": "seconds",
      "indexed": true,
      "input_formats": [
        "unix_timestamp",
        "iso8601"
      ],
      "output_format": "unix_timestamp_nanos",
      "stored": true
    },
    {
      "name": "httpStatusCode",
      "type": "i64",
      "coerce": true,
      "description": "HTTP status code",
      "fast": true,
      "indexed": true,
      "output_format": "number",
      "stored": true
    },
    {
      "name": "httpMethod",
      "type": "text",
      "description": "HTTP method",
      "fast": {
        "normalizer": "raw"
      },
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "httpPath",
      "type": "text",
      "description": "HTTP path",
      "fast": {
        "normalizer": "raw"
      },
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "grpcStatusCode",
      "type": "i64",
      "coerce": true,
      "description": "gRPC status code",
      "fast": true,
      "indexed": true,
      "output_format": "number",
      "stored": true
    },
    {
      "name": "grpcMethod",
      "type": "text",
      "description": "gRPC method",
      "fast": {
        "normalizer": "raw"
      },
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "extra",
      "type": "json",
      "description": "extra fields",
      "expand_dots": true,
      "fast": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "kafkaConsumerGroupId",
      "type": "text",
      "description": "Kafka Consumer Group Id",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "kafkaConsumerClientId",
      "type": "text",
      "description": "Kafka Consumer Client Id",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "kafkaConsumerHostName",
      "type": "text",
      "description": "Kafka Consumer Host Name",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "kafkaTopic",
      "type": "text",
      "description": "Kafka Topic",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "kafkaPartition",
      "type": "i64",
      "coerce": true,
      "description": "Kafka Partition",
      "fast": false,
      "indexed": true,
      "output_format": "number",
      "stored": true
    },
    {
      "name": "kafkaOffset",
      "type": "i64",
      "coerce": true,
      "description": "Kafka Offset",
      "fast": false,
      "indexed": true,
      "output_format": "number",
      "stored": true
    },
    {
      "name": "kafkaMessageKey",
      "type": "text",
      "description": "Kafka Message Key",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "kafkaConsumingResult",
      "type": "text",
      "description": "Kafka Consuming Result",
      "fast": {
        "normalizer": "raw"
      },
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "default"
    },
    {
      "name": "env",
      "type": "text",
      "description": "represents environmental information as an extension field in the cloudEvents specification",
      "fast": false,
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "region",
      "type": "text",
      "description": "represents region information as an extension field in the cloudEvents specification",
      "fast": {
        "normalizer": "raw"
      },
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    },
    {
      "name": "namespace",
      "type": "text",
      "description": "represents namespace information as an extension field in the cloudEvents specification",
      "fast": {
        "normalizer": "raw"
      },
      "fieldnorms": false,
      "indexed": true,
      "record": "basic",
      "stored": true,
      "tokenizer": "raw"
    }
  ],
  "timestamp_field": "time",
  "tag_fields": [
    "region"
  ],
  "partition_key": "namespace",
  "max_num_partitions": 200,
  "index_field_presence": false,
  "store_document_size": false,
  "store_source": false,
  "tokenizers": []
}

Index settings

{
  "commit_timeout_secs": 60,
  "docstore_compression_level": 8,
  "docstore_blocksize": 1000000,
  "split_num_docs_target": 10000000,
  "merge_policy": {
    "type": "stable_log",
    "min_level_num_docs": 100000,
    "merge_factor": 10,
    "max_merge_factor": 12,
    "maturation_period": "2days"
  },
  "resources": {
    "heap_size": "2.0 GB"
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions