diff --git a/fhevm-engine/coprocessor/src/tests/operators_from_events.rs b/fhevm-engine/coprocessor/src/tests/operators_from_events.rs index 50521313..b3718177 100644 --- a/fhevm-engine/coprocessor/src/tests/operators_from_events.rs +++ b/fhevm-engine/coprocessor/src/tests/operators_from_events.rs @@ -324,7 +324,6 @@ async fn test_fhe_binary_operands_events() -> Result<(), Box Result<(), Box Result<(), Box }, ))) .await?; - listener_event_to_db.notify_scheduler().await; wait_until_all_ciphertexts_computed(&app).await?; let decrypt_request = vec![output_handle.to_vec()]; let resp = decrypt_ciphertexts(&pool, 1, decrypt_request).await?; @@ -626,7 +623,6 @@ async fn test_fhe_cast_events() -> Result<(), Box> { }))) .await?; - listener_event_to_db.notify_scheduler().await; wait_until_all_ciphertexts_computed(&app).await?; let decrypt_request = vec![output_handle.to_vec()]; let resp = decrypt_ciphertexts(&pool, 1, decrypt_request).await?; @@ -713,7 +709,6 @@ async fn test_fhe_rand_events() -> Result<(), Box> { ))) .await?; - listener_event_to_db.notify_scheduler().await; wait_until_all_ciphertexts_computed(&app).await?; let decrypt_request = vec![ diff --git a/fhevm-engine/fhevm-db/migrations/20250303135355_fhevm_listner_auto_notify.sql b/fhevm-engine/fhevm-db/migrations/20250303135355_fhevm_listner_auto_notify.sql new file mode 100644 index 00000000..6cf3ecbd --- /dev/null +++ b/fhevm-engine/fhevm-db/migrations/20250303135355_fhevm_listner_auto_notify.sql @@ -0,0 +1,16 @@ +-- Create function to notify on work updates +CREATE OR REPLACE FUNCTION notify_work_available() + RETURNS trigger AS $$ +BEGIN + -- Notify all listeners of work_updated channel + NOTIFY work_available; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +-- Create trigger to fire once per statement on computations inserts +CREATE TRIGGER work_updated_trigger_from_computations_insertions + AFTER INSERT + ON computations + FOR EACH STATEMENT + EXECUTE FUNCTION notify_work_available(); diff --git a/fhevm-engine/fhevm-listener/src/cmd/mod.rs b/fhevm-engine/fhevm-listener/src/cmd/mod.rs index 585c2e87..6f2232dd 100644 --- a/fhevm-engine/fhevm-listener/src/cmd/mod.rs +++ b/fhevm-engine/fhevm-listener/src/cmd/mod.rs @@ -318,12 +318,10 @@ pub async fn main(args: Args) { // TODO: filter on contract address if known println!("TFHE {event:#?}"); if let Some(ref mut db) = db { - match db.insert_tfhe_event(&event).await { - Ok(_) => db.notify_scheduler().await, - Err(err) => { - block_error_event_fthe += 1; - eprintln!("Error inserting tfhe event: {err}") - } + let res = db.insert_tfhe_event(&event).await; + if let Err(err) = res { + block_error_event_fthe += 1; + eprintln!("Error inserting tfhe event: {err}"); } } continue; diff --git a/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs b/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs index a3832d12..f91f980c 100644 --- a/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs +++ b/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs @@ -28,7 +28,6 @@ pub type ClearConst = Uint<256, 4>; const MAX_RETRIES_FOR_NOTIFY: usize = 5; pub const EVENT_PBS_COMPUTATIONS: &str = "event_pbs_computations"; pub const EVENT_ALLOWED_HANDLE: &str = "event_allowed_handle"; -pub const EVENT_WORK_AVAILABLE: &str = "work_available"; pub fn retry_on_sqlx_error(err: &SqlxError) -> bool { match err { @@ -374,10 +373,6 @@ impl Database { } } - pub async fn notify_scheduler(&mut self) { - self.notify_database(EVENT_WORK_AVAILABLE).await - } - /// Handles all types of ACL events pub async fn handle_acl_event( &mut self,