Skip to content

Commit 9a9cbf7

Browse files
authored
Data streams (#89)
* Update FFI to v0.12.20 * Add support for data streams using the high-level FFI interface defined in Data streams rust-sdks#603 * Add code examples to README mirroring the examples from the LiveKit docs for other SDKs
1 parent f5d8490 commit 9a9cbf7

31 files changed

+35836
-2766
lines changed

BuildScripts~/generate_proto.sh

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ OUT_CSHARP=../Runtime/Scripts/Proto
66
protoc \
77
-I=$FFI_PROTOCOL \
88
--csharp_out=$OUT_CSHARP \
9-
$FFI_PROTOCOL/audio_frame.proto \
10-
$FFI_PROTOCOL/e2ee.proto \
119
$FFI_PROTOCOL/ffi.proto \
1210
$FFI_PROTOCOL/handle.proto \
13-
$FFI_PROTOCOL/participant.proto \
1411
$FFI_PROTOCOL/room.proto \
15-
$FFI_PROTOCOL/stats.proto \
1612
$FFI_PROTOCOL/track.proto \
13+
$FFI_PROTOCOL/track_publication.proto \
14+
$FFI_PROTOCOL/participant.proto \
1715
$FFI_PROTOCOL/video_frame.proto \
18-
$FFI_PROTOCOL/rpc.proto
16+
$FFI_PROTOCOL/audio_frame.proto \
17+
$FFI_PROTOCOL/e2ee.proto \
18+
$FFI_PROTOCOL/stats.proto \
19+
$FFI_PROTOCOL/rpc.proto \
20+
$FFI_PROTOCOL/data_stream.proto

README.md

Lines changed: 166 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ void TrackSubscribed(IRemoteTrack track, RemoteTrackPublication publication, Rem
198198

199199
### RPC
200200

201-
Perform your own predefined method calls from one participant to another.
201+
Perform your own predefined method calls from one participant to another.
202202

203203
This feature is especially powerful when used with [Agents](https://docs.livekit.io/agents), for instance to forward LLM function calls to your client application.
204204

@@ -235,7 +235,7 @@ IEnumerator PerformRpcCoroutine()
235235
Method = "greet",
236236
Payload = "Hello from RPC!"
237237
});
238-
238+
239239
yield return rpcCall;
240240

241241
if (rpcCall.IsError)
@@ -256,11 +256,174 @@ You may find it useful to adjust the `ResponseTimeout` parameter, which indicate
256256

257257
#### Errors
258258

259-
LiveKit is a dynamic realtime environment and RPC calls can fail for various reasons.
259+
LiveKit is a dynamic realtime environment and RPC calls can fail for various reasons.
260260

261261
You may throw errors of the type `RpcError` with a string `message` in an RPC method handler and they will be received on the caller's side with the message intact. Other errors will not be transmitted and will instead arrive to the caller as `1500` ("Application Error"). Other built-in errors are detailed in the [docs](https://docs.livekit.io/home/client/data/rpc/#errors).
262262

263+
### Sending text
264+
265+
Use text streams to send any amount of text between participants.
266+
267+
#### Sending text all at once
268+
269+
```cs
270+
IEnumerator PerformSendText()
271+
{
272+
var text = "Lorem ipsum dolor sit amet...";
273+
var sendTextCall = room.LocalParticipant.SendText(text, "some-topic");
274+
yield return sendTextCall;
275+
276+
Debug.Log($"Sent text with stream ID {sendTextCall.Info.Id}");
277+
}
278+
```
279+
280+
#### Streaming text incrementally
281+
282+
```cs
283+
IEnumerator PerformStreamText()
284+
{
285+
var streamTextCall = room.LocalParticipant.StreamText("my-topic");
286+
yield return streamTextCall;
287+
288+
var writer = streamTextCall.Writer;
289+
Debug.Log($"Opened text stream with ID: {writer.Info.Id}");
290+
291+
// In a real app, you would generate this text asynchronously / incrementally as well
292+
var textChunks = new[] { "Lorem ", "ipsum ", "dolor ", "sit ", "amet..." };
293+
foreach (var chunk in textChunks)
294+
{
295+
yield return writer.Write(chunk);
296+
}
297+
298+
// The stream must be explicitly closed when done
299+
yield return writer.Close();
300+
301+
Debug.Log($"Closed text stream with ID: {writer.Info.Id}");
302+
}
303+
```
304+
305+
#### Handling incoming streams
306+
307+
```cs
308+
IEnumerator HandleTextStream(TextStreamReader reader, string participantIdentity)
309+
{
310+
var info = reader.Info;
311+
Debug.Log($@"
312+
Text stream received from {participantIdentity}
313+
Topic: {info.Topic}
314+
Timestamp: {info.Timestamp}
315+
ID: {info.Id}
316+
Size: {info.TotalLength} (only available if the stream was sent with `SendText`)
317+
");
318+
319+
// Option 1: Process the stream incrementally
320+
var readIncremental = reader.ReadIncremental();
321+
while (!readIncremental.IsEos)
322+
{
323+
readIncremental.Reset();
324+
yield return readIncremental;
325+
Debug.Log($"Next chunk: {readIncremental.Text}");
326+
}
327+
328+
// Option 2: Get the entire text after the stream completes
329+
var readAllCall = reader.ReadAll();
330+
yield return readAllCall;
331+
Debug.Log($"Received text: {readAllCall.Text}")
332+
}
333+
334+
// Register the topic before connecting to the room
335+
room.RegisterTextStreamHandler("my-topic", (reader, identity) =>
336+
StartCoroutine(HandleTextStream(reader, identity))
337+
);
338+
```
339+
340+
### Sending files & bytes
341+
342+
Use byte streams to send files, images, or any other kind of data between participants.
343+
344+
#### Sending files
345+
346+
```cs
347+
IEnumerator PerformSendFile()
348+
{
349+
var filePath = "path/to/file.jpg";
350+
var sendFileCall = room.LocalParticipant.SendFile(filePath, "some-topic");
351+
yield return sendFileCall;
352+
353+
Debug.Log($"Sent file with stream ID: {sendFileCall.Info.Id}");
354+
}
355+
```
356+
357+
#### Streaming bytes
358+
359+
```cs
360+
IEnumerator PerformStreamBytes()
361+
{
362+
var streamBytesCall = room.LocalParticipant.StreamBytes("my-topic");
363+
yield return streamBytesCall;
364+
365+
var writer = streamBytesCall.Writer;
366+
Debug.Log($"Opened byte stream with ID: {writer.Info.Id}");
367+
368+
// Example sending arbitrary binary data
369+
// For sending files, use `SendFile` instead
370+
var dataChunks = new[] {
371+
new byte[] { 0x00, 0x01 },
372+
new byte[] { 0x03, 0x04 }
373+
};
374+
foreach (var chunk in dataChunks)
375+
{
376+
yield return writer.Write(chunk);
377+
}
378+
379+
// The stream must be explicitly closed when done
380+
yield return writer.Close();
381+
382+
Debug.Log($"Closed byte stream with ID: {writer.Info.Id}");
383+
}
384+
```
385+
386+
#### Handling incoming streams
263387

388+
```cs
389+
IEnumerator HandleByteStream(ByteStreamReader reader, string participantIdentity)
390+
{
391+
var info = reader.Info;
392+
393+
// Option 1: Process the stream incrementally
394+
var readIncremental = reader.ReadIncremental();
395+
while (!readIncremental.IsEos)
396+
{
397+
readIncremental.Reset();
398+
yield return readIncremental;
399+
Debug.Log($"Next chunk: {readIncremental.Bytes}");
400+
}
401+
402+
// Option 2: Get the entire file after the stream completes
403+
var readAllCall = reader.ReadAll();
404+
yield return readAllCall;
405+
var data = readAllCall.Bytes;
406+
407+
// Option 3: Write the stream to a local file on disk as it arrives
408+
var writeToFileCall = reader.WriteToFile();
409+
yield return writeToFileCall;
410+
var path = writeToFileCall.FilePath;
411+
Debug.Log($"Wrote to file: {path}");
412+
413+
Debug.Log($@"
414+
Byte stream received from {participantIdentity}
415+
Topic: {info.Topic}
416+
Timestamp: {info.Timestamp}
417+
ID: {info.Id}
418+
Size: {info.TotalLength} (only available if the stream was sent with `SendFile`)
419+
");
420+
}
421+
422+
// Register the topic after connection to the room
423+
room.RegisterByteStreamHandler("my-topic", (reader, identity) =>
424+
StartCoroutine(HandleByteStream(reader, identity))
425+
);
426+
```
264427

265428
<!--BEGIN_REPO_NAV-->
266429
<br/><table>
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:739851187aa82b80cccb6f54b8051d21f41ed2950bb04c69707809ef54a638e6
3-
size 389770720
2+
oid sha256:0faf50c97678a0e7cc14ba44fa9388600c989d34f5eba10c43b20c098c8ea02c
3+
size 390456280
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:5522f18ce78eb1f50cb4ca7e3769196defc8539f7c441fba260159152029525c
3-
size 390225536
2+
oid sha256:2060a7550703e2d6f3902294aab6a7d35e6365388c7c54897033581710384b6b
3+
size 390952040
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:9fb3fbf5f1ee14627ac4cbade4c01d0bbbc618871a33fa86d3cc0a1ceeb9ee3c
3-
size 22431680
2+
oid sha256:d8f2197a320a325c45991a8d4fac73c0afed21af231b5629025bc6295d2d8072
3+
size 22892808
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:fd951fec55fd911b5c5579ed11e23b62232e3ef37d2b6cd4ab3df60039004bcc
3-
size 28248048
2+
oid sha256:900fee3e36c07a58aa04a0fc4e8c638e526be1feaf7f6166e0f77ba4311c5cb1
3+
size 28893512
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:6bab83c74518c607ad63663a0bbeba5b7ba3273c99be23aa480d96050e71dcd0
3-
size 19029584
2+
oid sha256:809c8df28fbcce2a8f97a479e0f43594c347eebb03f56b8a391281e3b5c3c4e6
3+
size 19481568
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:91f61ac6942f40137dd6132f3769c584b3e916a51b9930a635c809b39abeb366
3-
size 23943212
2+
oid sha256:ec17d30409e1f02454504a9436f158177ee65231d802c792b4f28a21308fc7f8
3+
size 24491668
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:9cfe667483e9d15a4d85cd09b45bc748e793d4cc58bd9fa3328db942c6815d5a
3-
size 19950592
2+
oid sha256:0ae9e59502a4c77502f15d6ee144e62985a0aeee5542721b966a701f84259db1
3+
size 20545536
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:848f5c47f7f67ab524bf5c89b9167f820670146e42dc76929c5461d96b1328a4
3-
size 26507264
2+
oid sha256:edc94f69b5647a73890ce44818d3bf1a9003681f8572b4b6bbebcc07ffc279b2
3+
size 27193344

0 commit comments

Comments
 (0)