-
Notifications
You must be signed in to change notification settings - Fork 477
Open
Description
Hello,
Is there a way to evenly distribute indexing pipelines across all Indexers?
We are currently using 16 Indexers(K8s pods), each with 8 CPU cores and 32GB of memory. The index configuration sets num_pipelines to 32, and the Kafka topic also has 32 partitions. However, it appears that only a few Indexers are assigned the pipelines, while the rest remain underutilized. It seems that the only way to increase throughput is to add more CPU.
Is there a method to ensure that the pipelines are evenly distributed across all Indexers?
We are currently using the qw-airmail-20250522-hotfix version.
Below is the current index configuration:
Source
[
{
"version": "0.9",
"source_id": "kafka_to_access_log_v2_quickwit",
"num_pipelines": 32,
"enabled": true,
"source_type": "kafka",
"params": {
"topic": "log.common.access_log_v2_quickwit",
"client_params": {
"bootstrap.servers": "b-1.loggingkafka.euq7fc.c3.kafka.ap-northeast-2.amazonaws.com,b-2.loggingkafka.euq7fc.c3.kafka.ap-northeast-2.amazonaws.com,b-3.loggingkafka.euq7fc.c3.kafka.ap-northeast-2.amazonaws.com",
"group.id": "log.quickwit.access_log_v2"
}
},
"transform": {
"script": "# `occurredAt` is the timestamp of the log, but `time` is the timestamp of the log entry.\n# So we use `occurred_at` as `time` if it exists.\nif exists(.data.accessLogV2.occurredAt) {\n .time = .data.accessLogV2.occurredAt\n}\n# unwrapped data fields\nunwrappedDataFields = merge(., {\n \"requestId\": .data.accessLogV2.requestId,\n \"ip\": .data.accessLogV2.ip,\n \"sessionId\": .data.accessLogV2.sessionId,\n \"userAgent\": .data.accessLogV2.userAgent,\n \"xUserAgent\": .data.accessLogV2.xUserAgent,\n \"userId\": .data.accessLogV2.userId,\n \"deviceId\": .data.accessLogV2.deviceId,\n \"latency\": .data.accessLogV2.latency,\n \"latencyNs\": .data.accessLogV2.latencyNs,\n \"occurredAt\": .data.accessLogV2.occurredAt,\n \"httpStatusCode\": .data.accessLogV2.httpStatusCode,\n \"httpMethod\": .data.accessLogV2.httpMethod,\n \"httpPath\": .data.accessLogV2.httpPath,\n \"grpcStatusCode\": .data.accessLogV2.grpcStatusCode,\n \"grpcMethod\": .data.accessLogV2.grpcMethod,\n \"extra\": .data.accessLogV2.extra,\n \"kafkaConsumerGroupId\": .data.accessLogV2.kafkaConsumerGroupId,\n \"kafkaConsumerClientId\": .data.accessLogV2.kafkaConsumerClientId,\n \"kafkaConsumerHostName\": .data.accessLogV2.kafkaConsumerHostName,\n \"kafkaTopic\": .data.accessLogV2.kafkaTopic,\n \"kafkaPartition\": .data.accessLogV2.kafkaPartition,\n \"kafkaOffset\": .data.accessLogV2.kafkaOffset,\n \"kafkaMessageKey\": .data.accessLogV2.kafkaMessageKey,\n \"kafkaConsumingResult\":.data.accessLogV2.kafkaConsumingResult\n})\n\n# remove unused fields\ndel(unwrappedDataFields.data)\ndel(unwrappedDataFields.specversion)\ndel(unwrappedDataFields.type)\n\n. = unwrappedDataFields\n",
"timezone": "UTC"
},
"input_format": "json"
},
{
"version": "0.9",
"source_id": "_ingest-api-source",
"num_pipelines": 1,
"enabled": true,
"source_type": "ingest-api",
"input_format": "json"
},
{
"version": "0.9",
"source_id": "_ingest-source",
"num_pipelines": 1,
"enabled": true,
"source_type": "ingest",
"input_format": "json"
},
{
"version": "0.9",
"source_id": "_ingest-cli-source",
"num_pipelines": 1,
"enabled": true,
"source_type": "ingest-cli",
"input_format": "json"
}
]
Doc mapping
{
"doc_mapping_uid": "00000000000000000000000000",
"mode": "dynamic",
"dynamic_mapping": {
"indexed": true,
"tokenizer": "raw",
"record": "basic",
"stored": true,
"expand_dots": true,
"fast": {
"normalizer": "raw"
}
},
"field_mappings": [
{
"name": "id",
"type": "text",
"description": "unique identifier for the event",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "specversion",
"type": "text",
"description": "version information about the CloudEvents specification",
"fast": false,
"fieldnorms": false,
"indexed": false,
"stored": false
},
{
"name": "source",
"type": "text",
"description": "information about where the event occurred",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "subject",
"type": "text",
"description": "detailed information about the source where the event occurred",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "time",
"type": "datetime",
"description": "timestamp of the event",
"fast": true,
"fast_precision": "seconds",
"indexed": true,
"input_formats": [
"unix_timestamp",
"iso8601"
],
"output_format": "unix_timestamp_nanos",
"stored": true
},
{
"name": "datacontenttype",
"type": "text",
"description": "content type of the data",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "requestId",
"type": "text",
"description": "request id",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "ip",
"type": "ip",
"description": "ip address",
"fast": true,
"indexed": true,
"stored": true
},
{
"name": "userAgent",
"type": "text",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "xUserAgent",
"type": "text",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "userId",
"type": "text",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "deviceId",
"type": "text",
"description": "device's id eg) 6ae1f6a6-107d-3183-a1df-adf7368f9d10",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "latency",
"type": "text",
"description": "latency of the request, but already in latencyNs, so we don't need to index it",
"fast": false,
"fieldnorms": false,
"indexed": false,
"stored": false
},
{
"name": "latencyNs",
"type": "i64",
"coerce": true,
"description": "latency of the request in nanoseconds",
"fast": true,
"indexed": true,
"output_format": "number",
"stored": true
},
{
"name": "occurredAt",
"type": "datetime",
"description": "timestamp of the log entry",
"fast": false,
"fast_precision": "seconds",
"indexed": true,
"input_formats": [
"unix_timestamp",
"iso8601"
],
"output_format": "unix_timestamp_nanos",
"stored": true
},
{
"name": "httpStatusCode",
"type": "i64",
"coerce": true,
"description": "HTTP status code",
"fast": true,
"indexed": true,
"output_format": "number",
"stored": true
},
{
"name": "httpMethod",
"type": "text",
"description": "HTTP method",
"fast": {
"normalizer": "raw"
},
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "httpPath",
"type": "text",
"description": "HTTP path",
"fast": {
"normalizer": "raw"
},
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "grpcStatusCode",
"type": "i64",
"coerce": true,
"description": "gRPC status code",
"fast": true,
"indexed": true,
"output_format": "number",
"stored": true
},
{
"name": "grpcMethod",
"type": "text",
"description": "gRPC method",
"fast": {
"normalizer": "raw"
},
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "extra",
"type": "json",
"description": "extra fields",
"expand_dots": true,
"fast": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "kafkaConsumerGroupId",
"type": "text",
"description": "Kafka Consumer Group Id",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "kafkaConsumerClientId",
"type": "text",
"description": "Kafka Consumer Client Id",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "kafkaConsumerHostName",
"type": "text",
"description": "Kafka Consumer Host Name",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "kafkaTopic",
"type": "text",
"description": "Kafka Topic",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "kafkaPartition",
"type": "i64",
"coerce": true,
"description": "Kafka Partition",
"fast": false,
"indexed": true,
"output_format": "number",
"stored": true
},
{
"name": "kafkaOffset",
"type": "i64",
"coerce": true,
"description": "Kafka Offset",
"fast": false,
"indexed": true,
"output_format": "number",
"stored": true
},
{
"name": "kafkaMessageKey",
"type": "text",
"description": "Kafka Message Key",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "kafkaConsumingResult",
"type": "text",
"description": "Kafka Consuming Result",
"fast": {
"normalizer": "raw"
},
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "env",
"type": "text",
"description": "represents environmental information as an extension field in the cloudEvents specification",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "region",
"type": "text",
"description": "represents region information as an extension field in the cloudEvents specification",
"fast": {
"normalizer": "raw"
},
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
},
{
"name": "namespace",
"type": "text",
"description": "represents namespace information as an extension field in the cloudEvents specification",
"fast": {
"normalizer": "raw"
},
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "raw"
}
],
"timestamp_field": "time",
"tag_fields": [
"region"
],
"partition_key": "namespace",
"max_num_partitions": 200,
"index_field_presence": false,
"store_document_size": false,
"store_source": false,
"tokenizers": []
}
Index settings
{
"commit_timeout_secs": 60,
"docstore_compression_level": 8,
"docstore_blocksize": 1000000,
"split_num_docs_target": 10000000,
"merge_policy": {
"type": "stable_log",
"min_level_num_docs": 100000,
"merge_factor": 10,
"max_merge_factor": 12,
"maturation_period": "2days"
},
"resources": {
"heap_size": "2.0 GB"
}
}
Metadata
Metadata
Assignees
Labels
No labels