Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 81 additions & 19 deletions core/src/room_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ const WIDTH_THRESHOLD_2560: u32 = 2560;
enum RoomServiceCommand {
CreateRoom {
token: String,
event_loop_proxy: EventLoopProxy<UserEvent>,
},
PublishTrack {
width: u32,
height: u32,
event_loop_proxy: EventLoopProxy<UserEvent>,
use_av1: bool,
},
}
PublishSharerLocation(f64, f64, bool),
PublishControllerCursorEnabled(bool),
DestroyRoom,
Expand All @@ -66,6 +68,13 @@ pub enum RoomServiceError {
CreateRoom(String),
}


#[derive(Debug, thiserror::Error)]
pub enum PublishTrackError {
#[error("Failed to publish track: {0}")]
PublishTrack(String),
}

/*
* This struct is used for handling room events and functions
* from a thread in the async runtime.
Expand Down Expand Up @@ -151,8 +160,6 @@ impl RoomService {
/// # Arguments
///
/// * `token` - The token to use to connect to the room
/// * `width` - The width of the video track
/// * `height` - The height of the video track
/// * `event_loop_proxy` - The event loop proxy to send events to
///
/// # Returns
Expand All @@ -162,20 +169,14 @@ impl RoomService {
pub fn create_room(
&self,
token: String,
width: u32,
height: u32,
event_loop_proxy: EventLoopProxy<UserEvent>,
use_av1: bool,
) -> Result<(), RoomServiceError> {
log::info!("create_room: {width:?}, {height:?}");
log::info!("create_room");
let res = self
.service_command_tx
.send(RoomServiceCommand::CreateRoom {
token,
width,
height,
event_loop_proxy,
use_av1,
});
if let Err(e) = res {
return Err(RoomServiceError::CreateRoom(format!(
Expand All @@ -194,6 +195,49 @@ impl RoomService {
}
}

/// Publishes a video track, this will block until the room is created.
///
/// # Arguments
///
/// * `width` - The width of the video track
/// * `height` - The height of the video track
/// * `use_av1` - If av1 codec is being used
///
/// # Returns
///
/// * `Ok(())` - The track was published successfully
/// * `Err(())` - The track was not published successfully
pub fn publish_track(
&self,
width: u32,
height: u32,
use_av1: bool
) -> Result<(), RoomServiceError> {
log::info!("publish_track: {width:?}, {height:?}");
let res = self
.service_command_tx
.send(RoomServiceCommand::PublishTrack {
width,
height,
use_av1,
});
if let Err(e) = res {
return Err(RoomServiceError::PublishTrack(format!(
"Failed to send command: {e:?}"
)));
}
let res = self.service_command_res_rx.recv();
match res {
Ok(RoomServiceCommandResult::Success) => Ok(()),
Ok(RoomServiceCommandResult::Failure) => Err(RoomServiceError::PublishTrack(
"Failed to publish track".to_string(),
)),
Err(e) => Err(RoomServiceError::PublishTrack(format!(
"Failed to receive result: {e:?}"
))),
}
}

/// Destroys the current room connection.
pub fn destroy_room(&self) {
log::info!("destroy_room");
Expand Down Expand Up @@ -311,9 +355,11 @@ impl RoomService {
///
/// # Commands Handled
///
/// * `CreateRoom` - Creates a new LiveKit room connection, publishes a video track,
/// and sets up event handling. If a room already exists, it will be closed first.
/// The video track is configured with VP9 codec and adaptive bitrate based on width.
/// * `CreateRoom` - Creates a new LiveKit room connection and sets up event handing.
/// If a room already exists, it will be closed first.
///
/// * `PublishTrack` - Publishes a video track. The video track is configured with
/// VP9 codec and adaptive bitrate based on width.
///
/// * `DestroyRoom` - Closes the current room connection and cleans up associated
/// resources including the buffer source.
Expand Down Expand Up @@ -348,10 +394,7 @@ async fn room_service_commands(
// TODO: Break this into create room and publish track commands
RoomServiceCommand::CreateRoom {
token,
width,
height,
event_loop_proxy,
use_av1,
} => {
{
let mut inner_room = inner.room.lock().await;
Expand Down Expand Up @@ -385,6 +428,25 @@ async fn room_service_commands(
/* Spawn thread for handling livekit data events. */
tokio::spawn(handle_room_events(rx, event_loop_proxy, user_sid));

let mut inner_room = inner.room.lock().await;
*inner_room = Some(room);
let res = tx.send(RoomServiceCommandResult::Success);
if let Err(e) = res {
log::error!("room_service_commands: Failed to send result: {e:?}");
}
}
RoomServiceCommand::PublishTrack {
width,
height,
use_av1,
} => {
let mut inner_room = inner.room.lock().await;
if inner_room.is_none() {
log::warn!("room_service_commands: Room doesn't exist.");
continue
}
let room = inner_room.as_ref().unwrap();

let buffer_source = NativeVideoSource::new(VideoResolution { width, height });
let track = LocalVideoTrack::create_video_track(
VIDEO_TRACK_NAME,
Expand Down Expand Up @@ -422,6 +484,7 @@ async fn room_service_commands(
},
)
.await;

if let Err(e) = res {
log::error!("room_service_command: Failed to publish track: {e:?}");
let res = tx.send(RoomServiceCommandResult::Failure);
Expand All @@ -431,10 +494,9 @@ async fn room_service_commands(
continue;
}

let mut inner_room = inner.room.lock().await;
*inner_room = Some(room);
let mut inner_buffer_source = inner.buffer_source.lock().unwrap();
*inner_buffer_source = Some(buffer_source);

let res = tx.send(RoomServiceCommandResult::Success);
if let Err(e) = res {
log::error!("room_service_commands: Failed to send result: {e:?}");
Expand Down