Skip to content

Commit dfd64d2

Browse files
authored
[service-utils] Expose more kafka configs, set max in-flight requests (#6181)
1 parent 4e703fd commit dfd64d2

File tree

3 files changed

+24
-14
lines changed

3 files changed

+24
-14
lines changed

.changeset/quiet-goats-know.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": patch
3+
---
4+
5+
[service-utils] Expose maxInFlightRequests in Kafka producer

packages/service-utils/src/node/kafka.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ import { compress, decompress } from "lz4js";
66
import KafkaJS from "kafkajs";
77
const { CompressionCodecs } = KafkaJS;
88

9+
/**
10+
* Reference: https://kafka.js.org/docs/producing#producing-messages
11+
*/
12+
export interface KafkaProducerSendOptions {
13+
// Per-message settings.
14+
acks?: number;
15+
timeout?: number;
16+
17+
// Per-producer settings.
18+
retries?: number;
19+
allowAutoTopicCreation?: boolean;
20+
maxInFlightRequests?: number;
21+
}
22+
923
/**
1024
* Creates a KafkaProducer which opens a persistent TCP connection.
1125
* This class is thread-safe so your service should re-use one instance.
@@ -98,18 +112,13 @@ export class KafkaProducer {
98112
async send(
99113
topic: string,
100114
messages: Record<string, unknown>[],
101-
/**
102-
* Reference: https://kafka.js.org/docs/producing#producing-messages
103-
*/
104-
options?: {
105-
acks?: number;
106-
timeout?: number;
107-
allowAutoTopicCreation?: boolean;
108-
},
115+
options?: KafkaProducerSendOptions,
109116
): Promise<void> {
110117
if (!this.producer) {
111118
this.producer = this.kafka.producer({
112119
allowAutoTopicCreation: options?.allowAutoTopicCreation ?? false,
120+
maxInFlightRequests: options?.maxInFlightRequests ?? 2000,
121+
retry: { retries: options?.retries ?? 5 },
113122
});
114123
await this.producer.connect();
115124
}

packages/service-utils/src/node/usageV2.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
type UsageV2Source,
55
getTopicName,
66
} from "../core/usageV2.js";
7-
import { KafkaProducer } from "./kafka.js";
7+
import { KafkaProducer, type KafkaProducerSendOptions } from "./kafka.js";
88

99
/**
1010
* Creates a UsageV2Producer which opens a persistent TCP connection.
@@ -63,11 +63,7 @@ export class UsageV2Producer {
6363
/**
6464
* Reference: https://kafka.js.org/docs/producing#producing-messages
6565
*/
66-
options?: {
67-
acks?: number;
68-
timeout?: number;
69-
allowAutoTopicCreation?: boolean;
70-
},
66+
options?: KafkaProducerSendOptions,
7167
): Promise<void> {
7268
const parsedEvents = events.map((event) => ({
7369
...event,

0 commit comments

Comments
 (0)