Skip to content

Commit 5756c74

Browse files
authored
[service-utils] feat: Add usageV2 support (#6021)
1 parent a1cff7a commit 5756c74

File tree

6 files changed

+251
-19
lines changed

6 files changed

+251
-19
lines changed

.changeset/neat-ads-build.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": minor
3+
---
4+
5+
feat: Add usageV2 support

packages/service-utils/package.json

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,8 @@
2626
},
2727
"typesVersions": {
2828
"*": {
29-
"node": [
30-
"./dist/types/node/index.d.ts"
31-
],
32-
"cf-worker": [
33-
"./dist/types/cf-worker/index.d.ts"
34-
]
29+
"node": ["./dist/types/node/index.d.ts"],
30+
"cf-worker": ["./dist/types/cf-worker/index.d.ts"]
3531
}
3632
},
3733
"repository": "https://github.com/thirdweb-dev/js/tree/main/packages/pay",
@@ -40,12 +36,11 @@
4036
"url": "https://github.com/thirdweb-dev/js/issues"
4137
},
4238
"author": "thirdweb eng <eng@thirdweb.com>",
43-
"files": [
44-
"dist/"
45-
],
39+
"files": ["dist/"],
4640
"sideEffects": false,
4741
"dependencies": {
4842
"aws4fetch": "1.0.20",
43+
"kafkajs": "2.2.4",
4944
"zod": "3.24.1"
5045
},
5146
"devDependencies": {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import type { UsageV2Event } from "../core/usageV2.js";
2+
3+
/**
4+
* Send events to Kafka.
5+
* This method may throw. To call this non-blocking:
6+
*
7+
* ```ts
8+
* void sendUsageV2Events(events, {
9+
* environment: "production",
10+
* serviceKey: "..."
11+
* }).catch(console.error)
12+
* ```
13+
*
14+
* @param events - The events to send.
15+
* @param options.environment - The environment the service is running in.
16+
* @param options.serviceKey - The service key required for authentication.
17+
*/
18+
export async function sendUsageV2Events(
19+
events: UsageV2Event[],
20+
options: {
21+
environment: "development" | "production";
22+
serviceKey: string;
23+
},
24+
): Promise<void> {
25+
const baseUrl =
26+
options.environment === "production"
27+
? "https://u.thirdweb.com"
28+
: "https://u.thirdweb-dev.com";
29+
30+
const resp = await fetch(`${baseUrl}/usage-v2/raw-events`, {
31+
method: "POST",
32+
headers: {
33+
"Content-Type": "application/json",
34+
"x-service-api-key": options.serviceKey,
35+
},
36+
body: JSON.stringify({ events }),
37+
});
38+
39+
if (!resp.ok) {
40+
throw new Error(
41+
`[UsageV2] unexpected response ${resp.status}: ${await resp.text()}`,
42+
);
43+
}
44+
resp.body?.cancel();
45+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
export interface UsageV2Event {
2+
/**
3+
* A unique identifier for the event. Defaults to a random UUID.
4+
* Useful if your service retries sending events.
5+
*/
6+
id?: `${string}-${string}-${string}-${string}-${string}`;
7+
/**
8+
* The event timestamp. Defaults to now().
9+
*/
10+
created_at?: Date;
11+
/**
12+
* The source of the event. Example: "storage"
13+
*/
14+
source: string;
15+
/**
16+
* The action of the event. Example: "upload"
17+
*/
18+
action: string;
19+
/**
20+
* The team ID.
21+
*/
22+
team_id: string;
23+
/**
24+
* The client ID, if available.
25+
*/
26+
client_id?: string;
27+
/**
28+
* The SDK name, if available.
29+
*/
30+
sdk_name?: string;
31+
/**
32+
* The SDK platform, if available.
33+
*/
34+
sdk_platform?: string;
35+
/**
36+
* The SDK version, if available.
37+
*/
38+
sdk_version?: string;
39+
/**
40+
* The SDK OS, if available.
41+
*/
42+
sdk_os?: string;
43+
/**
44+
* The product name, if available.
45+
*/
46+
product_name?: string;
47+
/**
48+
* The product version, if available.
49+
*/
50+
product_version?: string;
51+
/**
52+
* An object of service-specific data. Example: "file_size_bytes"
53+
* It is safe to pass any new JSON-serializable data here before updating the usageV2 schema.
54+
*/
55+
data: Record<string, unknown>;
56+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { randomUUID } from "node:crypto";
2+
import { checkServerIdentity } from "node:tls";
3+
import { Kafka, type Producer } from "kafkajs";
4+
import type { UsageV2Event } from "../core/usageV2.js";
5+
6+
const TOPIC_USAGE_V2 = "usage_v2.raw_events";
7+
8+
/**
9+
* Creates a UsageV2Producer which opens a persistent TCP connection.
10+
* This class is thread-safe so your service should re-use one instance.
11+
*
12+
* Example:
13+
* ```ts
14+
* usageV2 = new UsageV2Producer(..)
15+
* await usageV2.init()
16+
* await usageV2.sendEvents(events)
17+
* // Non-blocking:
18+
* // void usageV2.sendEvents(events).catch(console.error)
19+
* ```
20+
*/
21+
export class UsageV2Producer {
22+
private kafka: Kafka;
23+
private producer: Producer | null = null;
24+
25+
constructor(config: {
26+
/**
27+
* A descriptive name for your service. Example: "storage-server"
28+
*/
29+
producerName: string;
30+
/**
31+
* The environment the service is running in.
32+
*/
33+
environment: "development" | "production";
34+
35+
username: string;
36+
password: string;
37+
}) {
38+
this.kafka = new Kafka({
39+
clientId: `${config.producerName}-${config.environment}`,
40+
brokers:
41+
config.environment === "production"
42+
? ["warpstream.thirdweb.xyz:9092"]
43+
: ["warpstream-dev.thirdweb.xyz:9092"],
44+
ssl: {
45+
checkServerIdentity(hostname, cert) {
46+
return checkServerIdentity(hostname.toLowerCase(), cert);
47+
},
48+
},
49+
sasl: {
50+
mechanism: "plain",
51+
username: config.username,
52+
password: config.password,
53+
},
54+
});
55+
}
56+
57+
/**
58+
* Connect the producer.
59+
* This must be called before calling `sendEvents()`.
60+
*/
61+
async init() {
62+
this.producer = this.kafka.producer({
63+
allowAutoTopicCreation: false,
64+
});
65+
await this.producer.connect();
66+
}
67+
68+
/**
69+
* Send usageV2 events.
70+
* This method may throw. To call this non-blocking:
71+
*
72+
* ```ts
73+
* usageV2 = new UsageV2Producer(...)
74+
* void usageV2.sendEvents(events).catch(console.error)
75+
*
76+
* @param events - The events to send.
77+
*/
78+
async sendEvents(events: UsageV2Event[]): Promise<void> {
79+
if (!this.producer) {
80+
throw new Error("Producer not initialized. Call `init()` first.");
81+
}
82+
83+
const parsedEvents = events.map((event) => {
84+
return {
85+
id: event.id ?? randomUUID(),
86+
created_at: event.created_at ?? new Date(),
87+
source: event.source,
88+
action: event.action,
89+
// Remove the "team_" prefix, if any.
90+
team_id: event.team_id.startsWith("team_")
91+
? event.team_id.slice(5)
92+
: event.team_id,
93+
client_id: event.client_id,
94+
sdk_name: event.sdk_name,
95+
sdk_platform: event.sdk_platform,
96+
sdk_version: event.sdk_version,
97+
sdk_os: event.sdk_os,
98+
product_name: event.product_name,
99+
product_version: event.product_version,
100+
data: JSON.stringify(event.data),
101+
};
102+
});
103+
104+
await this.producer.send({
105+
topic: TOPIC_USAGE_V2,
106+
messages: parsedEvents.map((event) => ({
107+
value: JSON.stringify(event),
108+
})),
109+
});
110+
}
111+
112+
/**
113+
* Disconnects UsageV2Producer.
114+
* Useful when shutting down the service to flush in-flight events.
115+
*/
116+
async disconnect() {
117+
if (this.producer) {
118+
await this.producer.disconnect();
119+
this.producer = null;
120+
}
121+
}
122+
}

pnpm-lock.yaml

Lines changed: 19 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)