Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.

Rudy/fhevm listener auto notify directly on db #354

Merged
merged 1 commit into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions fhevm-engine/coprocessor/src/tests/operators_from_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ async fn test_fhe_binary_operands_events() -> Result<(), Box<dyn std::error::Err
listener_event_to_db
.insert_tfhe_event(&tfhe_event(op_event))
.await?;
listener_event_to_db.notify_scheduler().await;

cases.push((op, output_handle));
}
Expand Down Expand Up @@ -429,7 +428,6 @@ async fn test_fhe_unary_operands_events() -> Result<(), Box<dyn std::error::Erro
listener_event_to_db
.insert_tfhe_event(&tfhe_event(op_event))
.await?;
listener_event_to_db.notify_scheduler().await;
wait_until_all_ciphertexts_computed(&app).await?;

let decrypt_request = vec![output_handle.to_vec()];
Expand Down Expand Up @@ -551,7 +549,6 @@ async fn test_fhe_if_then_else_events() -> Result<(), Box<dyn std::error::Error>
},
)))
.await?;
listener_event_to_db.notify_scheduler().await;
wait_until_all_ciphertexts_computed(&app).await?;
let decrypt_request = vec![output_handle.to_vec()];
let resp = decrypt_ciphertexts(&pool, 1, decrypt_request).await?;
Expand Down Expand Up @@ -626,7 +623,6 @@ async fn test_fhe_cast_events() -> Result<(), Box<dyn std::error::Error>> {
})))
.await?;

listener_event_to_db.notify_scheduler().await;
wait_until_all_ciphertexts_computed(&app).await?;
let decrypt_request = vec![output_handle.to_vec()];
let resp = decrypt_ciphertexts(&pool, 1, decrypt_request).await?;
Expand Down Expand Up @@ -713,7 +709,6 @@ async fn test_fhe_rand_events() -> Result<(), Box<dyn std::error::Error>> {
)))
.await?;

listener_event_to_db.notify_scheduler().await;
wait_until_all_ciphertexts_computed(&app).await?;

let decrypt_request = vec![
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Create function to notify on work updates
CREATE OR REPLACE FUNCTION notify_work_available()
RETURNS trigger AS $$
BEGIN
-- Notify all listeners of work_updated channel
NOTIFY work_available;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Create trigger to fire once per statement on computations inserts
CREATE TRIGGER work_updated_trigger_from_computations_insertions
AFTER INSERT
ON computations
FOR EACH STATEMENT
EXECUTE FUNCTION notify_work_available();
10 changes: 4 additions & 6 deletions fhevm-engine/fhevm-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,10 @@ pub async fn main(args: Args) {
// TODO: filter on contract address if known
println!("TFHE {event:#?}");
if let Some(ref mut db) = db {
match db.insert_tfhe_event(&event).await {
Ok(_) => db.notify_scheduler().await,
Err(err) => {
block_error_event_fthe += 1;
eprintln!("Error inserting tfhe event: {err}")
}
let res = db.insert_tfhe_event(&event).await;
if let Err(err) = res {
block_error_event_fthe += 1;
eprintln!("Error inserting tfhe event: {err}");
}
}
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub type ClearConst = Uint<256, 4>;
const MAX_RETRIES_FOR_NOTIFY: usize = 5;
pub const EVENT_PBS_COMPUTATIONS: &str = "event_pbs_computations";
pub const EVENT_ALLOWED_HANDLE: &str = "event_allowed_handle";
pub const EVENT_WORK_AVAILABLE: &str = "work_available";

pub fn retry_on_sqlx_error(err: &SqlxError) -> bool {
match err {
Expand Down Expand Up @@ -374,10 +373,6 @@ impl Database {
}
}

pub async fn notify_scheduler(&mut self) {
self.notify_database(EVENT_WORK_AVAILABLE).await
}

/// Handles all types of ACL events
pub async fn handle_acl_event(
&mut self,
Expand Down
Loading