Skip to content

Commit 17ccb7b

Browse files
feat: prover redis cleanup job (#1852)
* redis queue cleanup job * format * format
1 parent a0e8c0e commit 17ccb7b

File tree

5 files changed

+453
-13
lines changed

5 files changed

+453
-13
lines changed

forester/tests/e2e_v1_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ async fn test_e2e_v1() {
234234
work_report_sender2,
235235
));
236236

237-
const EXPECTED_EPOCHS: u64 = 3; // We expect to process 2 epochs (0 and 1)
237+
const EXPECTED_EPOCHS: u64 = 2; // We expect to process 2 epochs (0 and 1)
238238

239239
let mut processed_epochs = HashSet::new();
240240
let mut total_processed = 0;

forester/tests/e2e_v2_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use crate::test_utils::{get_registration_phase_start_slot, init, wait_for_slot};
6464
mod test_utils;
6565

6666
const MINT_TO_NUM: u64 = 5;
67-
const DEFAULT_TIMEOUT_SECONDS: u64 = 60 * 15;
67+
const DEFAULT_TIMEOUT_SECONDS: u64 = 60 * 5;
6868
const COMPUTE_BUDGET_LIMIT: u32 = 1_000_000;
6969

7070
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -741,7 +741,7 @@ async fn execute_test_transactions<R: Rpc + Indexer + MerkleTreeExt, I: Indexer>
741741
address_v1_counter: &mut u64,
742742
address_v2_counter: &mut u64,
743743
) {
744-
let mut iterations = 10;
744+
let mut iterations = 4;
745745
if is_v2_state_test_enabled() {
746746
let batch_size =
747747
get_state_v2_batch_size(rpc, &env.v2_state_trees[0].merkle_tree).await as usize;

prover/server/main.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -970,6 +970,22 @@ func startCleanupRoutines(redisQueue *server.RedisQueue) {
970970
logging.Logger().Info().Msg("Startup cleanup of old proof requests completed")
971971
}
972972

973+
if err := redisQueue.CleanupStuckProcessingJobs(); err != nil {
974+
logging.Logger().Error().
975+
Err(err).
976+
Msg("Failed to cleanup stuck processing jobs on startup")
977+
} else {
978+
logging.Logger().Info().Msg("Startup cleanup of stuck processing jobs completed")
979+
}
980+
981+
if err := redisQueue.CleanupOldFailedJobs(); err != nil {
982+
logging.Logger().Error().
983+
Err(err).
984+
Msg("Failed to cleanup old failed jobs on startup")
985+
} else {
986+
logging.Logger().Info().Msg("Startup cleanup of old failed jobs completed")
987+
}
988+
973989
if err := redisQueue.CleanupOldResults(); err != nil {
974990
logging.Logger().Error().
975991
Err(err).
@@ -978,6 +994,31 @@ func startCleanupRoutines(redisQueue *server.RedisQueue) {
978994
logging.Logger().Info().Msg("Startup cleanup of old results completed")
979995
}
980996

997+
if err := redisQueue.CleanupOldResultKeys(); err != nil {
998+
logging.Logger().Error().
999+
Err(err).
1000+
Msg("Failed to cleanup old result keys on startup")
1001+
} else {
1002+
logging.Logger().Info().Msg("Startup cleanup of old result keys completed")
1003+
}
1004+
1005+
go func() {
1006+
processingTicker := time.NewTicker(10 * time.Second)
1007+
defer processingTicker.Stop()
1008+
1009+
logging.Logger().Info().Msg("Started stuck processing jobs cleanup routine (every 10 seconds)")
1010+
1011+
for range processingTicker.C {
1012+
if err := redisQueue.CleanupStuckProcessingJobs(); err != nil {
1013+
logging.Logger().Error().
1014+
Err(err).
1015+
Msg("Failed to cleanup stuck processing jobs")
1016+
} else {
1017+
logging.Logger().Debug().Msg("Stuck processing jobs cleanup completed")
1018+
}
1019+
}
1020+
}()
1021+
9811022
// Start cleanup for old proof requests (every 10 minutes)
9821023
go func() {
9831024
requestTicker := time.NewTicker(10 * time.Minute)
@@ -996,12 +1037,30 @@ func startCleanupRoutines(redisQueue *server.RedisQueue) {
9961037
}
9971038
}()
9981039

999-
// Start less frequent cleanup for old results (every 1 hour)
1040+
// Start cleanup for old failed jobs (every 30 minutes)
1041+
go func() {
1042+
failedTicker := time.NewTicker(30 * time.Minute)
1043+
defer failedTicker.Stop()
1044+
1045+
logging.Logger().Info().Msg("Started old failed jobs cleanup routine (every 30 minutes)")
1046+
1047+
for range failedTicker.C {
1048+
if err := redisQueue.CleanupOldFailedJobs(); err != nil {
1049+
logging.Logger().Error().
1050+
Err(err).
1051+
Msg("Failed to cleanup old failed jobs")
1052+
} else {
1053+
logging.Logger().Debug().Msg("Old failed jobs cleanup completed")
1054+
}
1055+
}
1056+
}()
1057+
1058+
// Start cleanup for old results (every 30 minutes)
10001059
go func() {
1001-
resultTicker := time.NewTicker(1 * time.Hour)
1060+
resultTicker := time.NewTicker(30 * time.Minute)
10021061
defer resultTicker.Stop()
10031062

1004-
logging.Logger().Info().Msg("Started old results cleanup routine (every 1 hour)")
1063+
logging.Logger().Info().Msg("Started old results cleanup routine (every 30 minutes)")
10051064

10061065
for range resultTicker.C {
10071066
if err := redisQueue.CleanupOldResults(); err != nil {
@@ -1011,6 +1070,14 @@ func startCleanupRoutines(redisQueue *server.RedisQueue) {
10111070
} else {
10121071
logging.Logger().Debug().Msg("Old results cleanup completed")
10131072
}
1073+
1074+
if err := redisQueue.CleanupOldResultKeys(); err != nil {
1075+
logging.Logger().Error().
1076+
Err(err).
1077+
Msg("Failed to cleanup old result keys")
1078+
} else {
1079+
logging.Logger().Debug().Msg("Old result keys cleanup completed")
1080+
}
10141081
}
10151082
}()
10161083
}

0 commit comments

Comments
 (0)