|
| 1 | +# @ydbjs/topic |
| 2 | + |
| 3 | +The `@ydbjs/topic` package provides high-level, type-safe clients for working with YDB topics (message queues) in JavaScript/TypeScript. It enables efficient streaming reads and writes, partition management, offset commits, and supports compression and custom payload encoding/decoding. |
| 4 | + |
| 5 | +## Features |
| 6 | + |
| 7 | +- Streaming topic reader and writer with async iteration |
| 8 | +- Partition session management and offset commit |
| 9 | +- Compression and custom payload encoding/decoding |
| 10 | +- TypeScript support with type definitions |
| 11 | +- Integration with `@ydbjs/core` and `@ydbjs/api` |
| 12 | + |
| 13 | +## Installation |
| 14 | + |
| 15 | +```sh |
| 16 | +npm install @ydbjs/core@alpha @ydbjs/topic@alpha |
| 17 | +``` |
| 18 | + |
| 19 | +## How It Works |
| 20 | + |
| 21 | +- **TopicReader**: Reads messages from a YDB topic as async batches, manages partition sessions, and supports offset commits. |
| 22 | +- **TopicWriter**: Writes messages to a YDB topic, supports batching, compression, and custom encoding. |
| 23 | +- **Integration**: Use with a `Driver` from `@ydbjs/core` for connection management and authentication. |
| 24 | + |
| 25 | +## Usage |
| 26 | + |
| 27 | +### Reading from a Topic |
| 28 | + |
| 29 | +```ts |
| 30 | +import { Driver } from '@ydbjs/core' |
| 31 | +import { TopicReader } from '@ydbjs/topic/reader' |
| 32 | +import { Codec } from '@ydbjs/api/topic' |
| 33 | + |
| 34 | +const driver = new Driver(process.env['YDB_CONNECTION_STRING']!) |
| 35 | +await driver.ready() |
| 36 | + |
| 37 | +await using reader = new TopicReader(driver, { |
| 38 | + topic: 'test-topic', |
| 39 | + consumer: 'test-consumer', |
| 40 | + maxBufferBytes: 64n * 1024n, |
| 41 | + compression: { |
| 42 | + decompress(codec, payload) { |
| 43 | + if (codec === Codec.GZIP) { |
| 44 | + return import('node:zlib').then((zlib) => zlib.gunzipSync(payload)) |
| 45 | + } else { |
| 46 | + throw new Error(`Unsupported codec: ${codec}`) |
| 47 | + } |
| 48 | + }, |
| 49 | + }, |
| 50 | +}) |
| 51 | + |
| 52 | +for await (let batch of reader.read({ limit: 50, waitMs: 1000 })) { |
| 53 | + console.log('received batch', batch.length) |
| 54 | + await reader.commit(batch) |
| 55 | +} |
| 56 | +``` |
| 57 | + |
| 58 | +### Writing to a Topic |
| 59 | + |
| 60 | +```ts |
| 61 | +import { Driver } from '@ydbjs/core' |
| 62 | +import { TopicWriter } from '@ydbjs/topic/writer' |
| 63 | +import { Codec } from '@ydbjs/api/topic' |
| 64 | +import * as zlib from 'node:zlib' |
| 65 | + |
| 66 | +const driver = new Driver(process.env['YDB_CONNECTION_STRING']!) |
| 67 | +await driver.ready() |
| 68 | + |
| 69 | +await using writer = new TopicWriter(driver, { |
| 70 | + topic: 'test-topic', |
| 71 | + producer: 'test-producer', |
| 72 | + maxBufferBytes: 64n * 1024n, |
| 73 | + flushIntervalMs: 5000, |
| 74 | + compression: { |
| 75 | + codec: Codec.GZIP, |
| 76 | + compress(payload) { |
| 77 | + return zlib.gzipSync(payload) |
| 78 | + }, |
| 79 | + }, |
| 80 | +}) |
| 81 | + |
| 82 | +writer.write(new Uint8Array([1, 2, 3, 4])) |
| 83 | +``` |
| 84 | + |
| 85 | +## Configuration & Options |
| 86 | + |
| 87 | +### TopicReaderOptions |
| 88 | + |
| 89 | +Options for configuring a `TopicReader` instance: |
| 90 | + |
| 91 | +| Option | Type | Description & Best Practice | |
| 92 | +| ------------------------- | ------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------- | |
| 93 | +| `topic` | `string \| TopicReaderSource \| TopicReaderSource[]` | Topic path or array of topic sources. Use a string for a single topic, or an array for multi-topic reading. | |
| 94 | +| `consumer` | `string` | Consumer name. Use a unique name per logical consumer group. | |
| 95 | +| `maxBufferBytes` | `bigint` | Max internal buffer size in bytes. Increase for high-throughput, decrease to limit memory usage. | |
| 96 | +| `updateTokenIntervalMs` | `number` | How often to update the token (ms). Default: 60000. Lower for short-lived tokens. | |
| 97 | +| `compression.decompress` | `(codec, payload) => Uint8Array \| Promise<Uint8Array>` | Custom decompression function. Use for custom codecs or to enable GZIP/LZ4, etc. | |
| 98 | +| `decode` | `(payload: Uint8Array) => Payload` | Custom payload decoder. Use for JSON, protobuf, or other formats. | |
| 99 | +| `onPartitionSessionStart` | `function` | Called when a partition session starts. Use to set custom read/commit offsets. | |
| 100 | +| `onPartitionSessionStop` | `function` | Called when a partition session stops. Use to commit offsets or cleanup. | |
| 101 | +| `onCommittedOffset` | `function` | Called after offsets are committed. Use for monitoring or logging. For high-throughput, prefer this hook over awaiting `commit()` (see note below). | |
| 102 | + |
| 103 | +> **Performance Note:** |
| 104 | +> |
| 105 | +> The `commit` method can be called without `await` to send commit requests to the server. If you use `await reader.commit(batch)`, your code will wait for the server's acknowledgment before continuing, which can significantly reduce throughput. For best performance in high-load scenarios, avoid awaiting `commit` directly in your main loop. Instead, use the `onCommittedOffset` hook to be notified when the server confirms the commit. This allows your application to process messages at maximum speed while still tracking commit confirmations asynchronously. |
| 106 | +
|
| 107 | +#### Example: Custom Decoder and Partition Hooks |
| 108 | + |
| 109 | +```ts |
| 110 | +await using reader = new TopicReader(driver, { |
| 111 | + topic: 'test-topic', |
| 112 | + consumer: 'my-consumer', |
| 113 | + decode(payload) { |
| 114 | + return JSON.parse(Buffer.from(payload).toString('utf8')) |
| 115 | + }, |
| 116 | + onPartitionSessionStart(session, committedOffset, partitionOffsets) { |
| 117 | + console.log('Partition started', session.partitionId) |
| 118 | + }, |
| 119 | + onPartitionSessionStop(session, committedOffset) { |
| 120 | + console.log('Partition stopped', session.partitionId) |
| 121 | + }, |
| 122 | +}) |
| 123 | +``` |
| 124 | + |
| 125 | +### TopicWriterOptions |
| 126 | + |
| 127 | +Options for configuring a `TopicWriter` instance: |
| 128 | + |
| 129 | +| Option | Type | Description & Best Practice | |
| 130 | +| ------------------------ | ------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------- | |
| 131 | +| `topic` | `string` | Topic path to write to. Required. | |
| 132 | +| `producer` | `string` | Producer name. Set for idempotency and tracking. | |
| 133 | +| `getLastSeqNo` | `boolean` | Get last sequence number before writing. Use for exactly-once or deduplication. | |
| 134 | +| `allowDuplicates` | `boolean` | Allow duplicate messages. Set to true for at-most-once delivery. | |
| 135 | +| `updateTokenIntervalMs` | `number` | How often to update the token (ms). Default: 60000. | |
| 136 | +| `maxBufferBytes` | `bigint` | Max buffer size in bytes. Increase for batching, decrease for low-latency. | |
| 137 | +| `maxInflightCount` | `bigint` | Max in-flight messages. Tune for throughput vs. memory. | |
| 138 | +| `flushIntervalMs` | `number` | Auto-flush interval (ms). Lower for low-latency, higher for throughput. | |
| 139 | +| `compression.codec` | `Codec` | Compression codec (e.g., GZIP). Use to reduce network usage. | |
| 140 | +| `compression.compress` | `(payload: Uint8Array) => Uint8Array \| Promise<Uint8Array>` | Custom compression function. Use for custom codecs or advanced compression. | |
| 141 | +| `compression.minRawSize` | `bigint` | Minimum payload size to compress. Avoids compressing small messages. | |
| 142 | +| `encode` | `(payload: Payload) => Uint8Array` | Custom encoder. Use for JSON, protobuf, or other formats. | |
| 143 | +| `onAck` | `(seqNo: bigint, status?: string) => void` | Called on message acknowledgment. Use for tracking or logging. For high-throughput, prefer this hook over awaiting `write()` (see note below). | |
| 144 | + |
| 145 | +> **Performance Note:** |
| 146 | +> |
| 147 | +> The `write` method adds messages to an internal buffer and returns a promise that resolves when the server acknowledges the write. If you use `await writer.write(...)`, your code will wait for the server's acknowledgment before continuing, which can significantly reduce throughput. For best performance in high-load scenarios, avoid awaiting `write` directly in your main loop. Instead, use the `onAck` hook to be notified when the server confirms the write. You can tune throughput and latency using `maxBufferBytes`, `maxInflightCount`, and `flushIntervalMs` options to control how quickly messages are sent to the server. |
| 148 | +
|
| 149 | +#### Example: Custom Encoder and Compression |
| 150 | + |
| 151 | +```ts |
| 152 | +await using writer = new TopicWriter(driver, { |
| 153 | + topic: 'test-topic', |
| 154 | + producer: 'json-producer', |
| 155 | + encode(payload) { |
| 156 | + return Buffer.from(JSON.stringify(payload), 'utf8') |
| 157 | + }, |
| 158 | + compression: { |
| 159 | + codec: Codec.GZIP, |
| 160 | + compress(payload) { |
| 161 | + return zlib.gzipSync(payload) |
| 162 | + }, |
| 163 | + }, |
| 164 | + onAck(seqNo, status) { |
| 165 | + console.log('Ack for', seqNo, 'status:', status) |
| 166 | + }, |
| 167 | +}) |
| 168 | + |
| 169 | +writer.write({ foo: 'bar', ts: Date.now() }) |
| 170 | +``` |
| 171 | + |
| 172 | +## API |
| 173 | + |
| 174 | +### TopicReader |
| 175 | + |
| 176 | +- Reads messages from a topic as async batches |
| 177 | +- Supports custom decompression and decoding |
| 178 | +- Partition session hooks: `onPartitionSessionStart`, `onPartitionSessionStop`, `onCommittedOffset` |
| 179 | +- Offset commit with `commit()` |
| 180 | + |
| 181 | +### TopicWriter |
| 182 | + |
| 183 | +- Writes messages to a topic, supports batching and compression |
| 184 | +- Custom encoding and compression |
| 185 | +- Ack callback: `onAck` |
| 186 | + |
| 187 | +### Types |
| 188 | + |
| 189 | +- `TopicMessage<Payload>`: Message structure for topic payloads |
| 190 | +- `TopicReaderOptions`, `TopicWriterOptions`: Configuration options for reader and writer |
| 191 | + |
| 192 | +## License |
| 193 | + |
| 194 | +Apache-2.0 |
0 commit comments