Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub enum ServerError {
RoomServiceNotFound,
#[error("Failed to create Livekit room")]
RoomCreationError,
#[error("Failed to publish track")]
PublishTrackError,
#[error("Failed to set overlay window fullscreen")]
FullscreenError,
#[error("Failed to create stream for screen share")]
Expand Down Expand Up @@ -315,21 +317,27 @@ impl<'a> Application<'a> {
}

let room_service = self.room_service.as_mut().unwrap();
let res = room_service.create_room(
screenshare_input.token,
let res = room_service.create_room(screenshare_input.token, self.event_loop_proxy.clone());
if let Err(error) = res {
log::error!("screenshare: error creating room: {error:?}");
drop(screen_capturer);
self.stop_screenshare();
return Err(ServerError::RoomCreationError);
}
log::info!("screenshare: room created");

let res = room_service.publish_track(
extent.width as u32,
extent.height as u32,
self.event_loop_proxy.clone(),
screenshare_input.use_av1,
);
if let Err(error) = res {
log::error!("screenshare: error creating room: {error:?}");
log::error!("screenshare: error publishing track: {error:?}");
drop(screen_capturer);
self.stop_screenshare();
return Err(ServerError::RoomCreationError);
}
log::info!("screenshare: room created");

log::info!("screenshare: track published");
let buffer_source = room_service.get_buffer_source();
screen_capturer.set_buffer_source(buffer_source);

Expand Down
97 changes: 79 additions & 18 deletions core/src/room_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ const WIDTH_THRESHOLD_2560: u32 = 2560;
enum RoomServiceCommand {
CreateRoom {
token: String,
event_loop_proxy: EventLoopProxy<UserEvent>,
},
PublishTrack {
width: u32,
height: u32,
event_loop_proxy: EventLoopProxy<UserEvent>,
use_av1: bool,
},
PublishSharerLocation(f64, f64, bool),
Expand All @@ -64,6 +66,8 @@ enum RoomServiceCommandResult {
pub enum RoomServiceError {
#[error("Failed to create room: {0}")]
CreateRoom(String),
#[error("Failed to publish track: {0}")]
PublishTrack(String),
}

/*
Expand Down Expand Up @@ -151,8 +155,6 @@ impl RoomService {
/// # Arguments
///
/// * `token` - The token to use to connect to the room
/// * `width` - The width of the video track
/// * `height` - The height of the video track
/// * `event_loop_proxy` - The event loop proxy to send events to
///
/// # Returns
Expand All @@ -162,20 +164,14 @@ impl RoomService {
pub fn create_room(
&self,
token: String,
width: u32,
height: u32,
event_loop_proxy: EventLoopProxy<UserEvent>,
use_av1: bool,
) -> Result<(), RoomServiceError> {
log::info!("create_room: {width:?}, {height:?}");
log::info!("create_room");
let res = self
.service_command_tx
.send(RoomServiceCommand::CreateRoom {
token,
width,
height,
event_loop_proxy,
use_av1,
});
if let Err(e) = res {
return Err(RoomServiceError::CreateRoom(format!(
Expand All @@ -194,6 +190,49 @@ impl RoomService {
}
}

/// Publishes a video track, this will block until the room is created.
///
/// # Arguments
///
/// * `width` - The width of the video track
/// * `height` - The height of the video track
/// * `use_av1` - If av1 codec is being used
///
/// # Returns
///
/// * `Ok(())` - The track was published successfully
/// * `Err(())` - The track was not published successfully
pub fn publish_track(
&self,
width: u32,
height: u32,
use_av1: bool,
) -> Result<(), RoomServiceError> {
log::info!("publish_track: {width:?}, {height:?}");
let res = self
.service_command_tx
.send(RoomServiceCommand::PublishTrack {
width,
height,
use_av1,
});
if let Err(e) = res {
return Err(RoomServiceError::PublishTrack(format!(
"Failed to send command: {e:?}"
)));
}
let res = self.service_command_res_rx.recv();
match res {
Ok(RoomServiceCommandResult::Success) => Ok(()),
Ok(RoomServiceCommandResult::Failure) => Err(RoomServiceError::PublishTrack(
"Failed to publish track".to_string(),
)),
Err(e) => Err(RoomServiceError::PublishTrack(format!(
"Failed to receive result: {e:?}"
))),
}
}

/// Destroys the current room connection.
pub fn destroy_room(&self) {
log::info!("destroy_room");
Expand Down Expand Up @@ -311,9 +350,11 @@ impl RoomService {
///
/// # Commands Handled
///
/// * `CreateRoom` - Creates a new LiveKit room connection, publishes a video track,
/// and sets up event handling. If a room already exists, it will be closed first.
/// The video track is configured with VP9 codec and adaptive bitrate based on width.
/// * `CreateRoom` - Creates a new LiveKit room connection and sets up event handing.
/// If a room already exists, it will be closed first.
///
/// * `PublishTrack` - Publishes a video track. The video track is configured with
/// VP9 codec and adaptive bitrate based on width.
///
/// * `DestroyRoom` - Closes the current room connection and cleans up associated
/// resources including the buffer source.
Expand Down Expand Up @@ -348,10 +389,7 @@ async fn room_service_commands(
// TODO: Break this into create room and publish track commands
RoomServiceCommand::CreateRoom {
token,
width,
height,
event_loop_proxy,
use_av1,
} => {
{
let mut inner_room = inner.room.lock().await;
Expand Down Expand Up @@ -385,6 +423,29 @@ async fn room_service_commands(
/* Spawn thread for handling livekit data events. */
tokio::spawn(handle_room_events(rx, event_loop_proxy, user_sid));

let mut inner_room = inner.room.lock().await;
*inner_room = Some(room);
let res = tx.send(RoomServiceCommandResult::Success);
if let Err(e) = res {
log::error!("room_service_commands: Failed to send result: {e:?}");
}
}
RoomServiceCommand::PublishTrack {
width,
height,
use_av1,
} => {
let inner_room = inner.room.lock().await;
if inner_room.is_none() {
log::error!("room_service_commands: Room doesn't exist.");
let res = tx.send(RoomServiceCommandResult::Failure);
if let Err(e) = res {
log::error!("room_service_commands: Failed to send result: {e:?}");
}
continue;
}
let room = inner_room.as_ref().unwrap();

let buffer_source = NativeVideoSource::new(VideoResolution { width, height });
let track = LocalVideoTrack::create_video_track(
VIDEO_TRACK_NAME,
Expand Down Expand Up @@ -422,6 +483,7 @@ async fn room_service_commands(
},
)
.await;

if let Err(e) = res {
log::error!("room_service_command: Failed to publish track: {e:?}");
let res = tx.send(RoomServiceCommandResult::Failure);
Expand All @@ -431,10 +493,9 @@ async fn room_service_commands(
continue;
}

let mut inner_room = inner.room.lock().await;
*inner_room = Some(room);
let mut inner_buffer_source = inner.buffer_source.lock().unwrap();
*inner_buffer_source = Some(buffer_source);

let res = tx.send(RoomServiceCommandResult::Success);
if let Err(e) = res {
log::error!("room_service_commands: Failed to send result: {e:?}");
Expand Down