Skip to content

Commit ebb6b1d

Browse files
feat: specialized queue workers for prover server (#1778)
* Add specialized queue workers and Docker deployment support - Replace generic queue workers with circuit-specific workers (update, append, address-append) - Add Docker deployment files (Dockerfile.light, docker-compose.yml) - Implement separate cleanup routines for old requests (30min) and results (1000+ items) - Add comprehensive Redis queue test suite with 1000+ lines of tests - Remove priority system and queue-workers flag in favor of circuit-based routing - Always route batch operations to queues to prevent cross-contamination - Enhanced failed job status reporting with detailed error information - Fix SERVER_MODE environment variable typo * Add Redis service and tests to prover workflow * Remove Redis connectivity check from prover tests
1 parent 83cf1c1 commit ebb6b1d

File tree

8 files changed

+1552
-127
lines changed

8 files changed

+1552
-127
lines changed

.github/workflows/prover-test.yml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ jobs:
2626
if: github.event.pull_request.draft == false
2727
runs-on: buildjet-8vcpu-ubuntu-2204
2828
timeout-minutes: 120
29+
services:
30+
redis:
31+
image: redis:7-alpine
32+
ports:
33+
- 6379:6379
34+
options: >-
35+
--health-cmd "redis-cli ping"
36+
--health-interval 10s
37+
--health-timeout 5s
38+
--health-retries 5
2939
steps:
3040
- name: Checkout sources
3141
uses: actions/checkout@v4
@@ -68,6 +78,44 @@ jobs:
6878
cd prover/server
6979
go test ./prover -timeout 60m
7080
81+
- name: Redis Queue tests
82+
env:
83+
TEST_REDIS_URL: redis://localhost:6379/15
84+
run: |
85+
cd prover/server
86+
go test -v -run TestRedis -timeout 10m
87+
88+
- name: Queue cleanup tests
89+
env:
90+
TEST_REDIS_URL: redis://localhost:6379/15
91+
run: |
92+
cd prover/server
93+
go test -v -run TestCleanup -timeout 5m
94+
95+
- name: Worker selection tests
96+
run: |
97+
cd prover/server
98+
go test -v -run TestWorkerSelection -timeout 5m
99+
100+
- name: Batch operations queue routing tests
101+
run: |
102+
cd prover/server
103+
go test -v -run TestBatchOperations -timeout 5m
104+
105+
- name: Queue processing flow tests
106+
env:
107+
TEST_REDIS_URL: redis://localhost:6379/15
108+
run: |
109+
cd prover/server
110+
go test -v -run TestJobProcessingFlow -timeout 5m
111+
112+
- name: Failed job status tests
113+
env:
114+
TEST_REDIS_URL: redis://localhost:6379/15
115+
run: |
116+
cd prover/server
117+
go test -v -run TestFailedJobStatus -timeout 5m
118+
71119
- name: Lightweight integration tests
72120
if: ${{ github.event.pull_request.base.ref == 'main' }}
73121
run: |

prover/server/Dockerfile.light

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
FROM golang:1.20.3-alpine AS builder
2+
3+
WORKDIR /app
4+
5+
COPY go.mod go.sum ./
6+
RUN go mod download && go mod verify
7+
8+
COPY . .
9+
10+
ENV CGO_ENABLED=0
11+
RUN go build -v -o /usr/local/bin/light-prover .
12+
13+
RUN mkdir -p /tmp/empty_proving_keys
14+
15+
FROM gcr.io/distroless/base-debian11:nonroot
16+
17+
COPY --from=builder /usr/local/bin/light-prover /usr/local/bin/light-prover
18+
19+
WORKDIR /proving-keys
20+
21+
COPY --chown=nonroot:nonroot --from=builder /tmp/empty_proving_keys /proving-keys/
22+
23+
WORKDIR /
24+
25+
ENTRYPOINT [ "light-prover" ]
26+
CMD [ "start" ]

prover/server/docker-compose.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
services:
2+
redis:
3+
image: redis:7.4.4-alpine3.21
4+
container_name: redis
5+
ports:
6+
- "6379:6379"
7+
restart: unless-stopped
8+
9+
prover:
10+
image: sergeytimoshin/prover-light:1.0.0
11+
container_name: prover
12+
ports:
13+
- "3001:3001"
14+
volumes:
15+
- ./proving-keys:/proving-keys/:ro
16+
command: >
17+
start
18+
--run-mode forester-test
19+
--redis-url=redis://redis:6379
20+
depends_on:
21+
- redis
22+
restart: unless-stopped

prover/server/main.go

Lines changed: 92 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -444,11 +444,6 @@ func runCli() {
444444
Usage: "Run only HTTP server (no queue workers)",
445445
Value: false,
446446
},
447-
&cli.IntFlag{
448-
Name: "queue-workers",
449-
Usage: "Number of queue worker goroutines",
450-
Value: 1,
451-
},
452447
},
453448
Action: func(context *cli.Context) error {
454449
if context.Bool("json-logging") {
@@ -481,14 +476,13 @@ func runCli() {
481476

482477
queueOnly := context.Bool("queue-only")
483478
serverOnly := context.Bool("server-only")
484-
numWorkers := context.Int("queue-workers")
485479

486480
enableQueue := redisURL != "" && !serverOnly
487481
enableServer := !queueOnly
488482

489483
if os.Getenv("QUEUE_MODE") == "true" {
490484
enableQueue = true
491-
if os.Getenv("SERVER _MODE") != "true" {
485+
if os.Getenv("SERVER_MODE") != "true" {
492486
enableServer = false
493487
}
494488
}
@@ -497,10 +491,9 @@ func runCli() {
497491
Bool("enable_queue", enableQueue).
498492
Bool("enable_server", enableServer).
499493
Str("redis_url", redisURL).
500-
Int("queue_workers", numWorkers).
501494
Msg("Starting ZK Prover service")
502495

503-
var workers []*server.QueueWorker
496+
var workers []server.QueueWorker
504497
var redisQueue *server.RedisQueue
505498
var instance server.RunningJob
506499

@@ -514,22 +507,49 @@ func runCli() {
514507
return fmt.Errorf("failed to connect to Redis: %w", err)
515508
}
516509

517-
startResultCleanup(redisQueue)
510+
startCleanupRoutines(redisQueue)
518511

519512
if stats, err := redisQueue.GetQueueStats(); err == nil {
520513
logging.Logger().Info().Interface("initial_queue_stats", stats).Msg("Redis connection successful")
521514
}
522515

523-
if numWorkers <= 0 {
524-
numWorkers = 1
516+
logging.Logger().Info().Msg("Starting queue workers")
517+
518+
startAllWorkers := runMode == prover.Forester || runMode == prover.ForesterTest
519+
520+
var workersStarted []string
521+
522+
// Start update worker for batch-update circuits or forester modes
523+
if startAllWorkers || containsCircuit(circuits, "update") || containsCircuit(circuits, "update-test") {
524+
updateWorker := server.NewUpdateQueueWorker(redisQueue, psv1, psv2)
525+
workers = append(workers, updateWorker)
526+
go updateWorker.Start()
527+
workersStarted = append(workersStarted, "update")
525528
}
526529

527-
logging.Logger().Info().Int("workers", numWorkers).Msg("Starting queue workers")
530+
// Start append worker for batch-append circuits or forester modes
531+
if startAllWorkers || containsCircuit(circuits, "append-with-proofs") || containsCircuit(circuits, "append-with-proofs-test") {
532+
appendWorker := server.NewAppendQueueWorker(redisQueue, psv1, psv2)
533+
workers = append(workers, appendWorker)
534+
go appendWorker.Start()
535+
workersStarted = append(workersStarted, "append")
536+
}
528537

529-
for i := 0; i < numWorkers; i++ {
530-
worker := server.NewQueueWorker(i+1, redisQueue, psv1, psv2)
531-
workers = append(workers, worker)
532-
go worker.Start()
538+
// Start address append worker for address-append circuits or forester modes
539+
if startAllWorkers || containsCircuit(circuits, "address-append") || containsCircuit(circuits, "address-append-test") {
540+
addressAppendWorker := server.NewAddressAppendQueueWorker(redisQueue, psv1, psv2)
541+
workers = append(workers, addressAppendWorker)
542+
go addressAppendWorker.Start()
543+
workersStarted = append(workersStarted, "address-append")
544+
}
545+
546+
if len(workersStarted) == 0 {
547+
logging.Logger().Warn().Msg("No queue workers started - no matching circuits found")
548+
} else {
549+
logging.Logger().Info().
550+
Strs("workers_started", workersStarted).
551+
Bool("forester_mode", startAllWorkers).
552+
Msg("Queue workers started")
533553
}
534554
}
535555

@@ -931,17 +951,68 @@ func debugProvingSystemKeys(keysDirPath string, runMode prover.RunMode, circuits
931951
}
932952
}
933953

934-
func startResultCleanup(redisQueue *server.RedisQueue) {
954+
func startCleanupRoutines(redisQueue *server.RedisQueue) {
955+
logging.Logger().Info().Msg("Running immediate cleanup on startup")
956+
957+
if err := redisQueue.CleanupOldRequests(); err != nil {
958+
logging.Logger().Error().
959+
Err(err).
960+
Msg("Failed to cleanup old proof requests on startup")
961+
} else {
962+
logging.Logger().Info().Msg("Startup cleanup of old proof requests completed")
963+
}
964+
965+
if err := redisQueue.CleanupOldResults(); err != nil {
966+
logging.Logger().Error().
967+
Err(err).
968+
Msg("Failed to cleanup old results on startup")
969+
} else {
970+
logging.Logger().Info().Msg("Startup cleanup of old results completed")
971+
}
972+
973+
// Start cleanup for old proof requests (every 10 minutes)
974+
go func() {
975+
requestTicker := time.NewTicker(10 * time.Minute)
976+
defer requestTicker.Stop()
977+
978+
logging.Logger().Info().Msg("Started old proof requests cleanup routine (every 10 minutes)")
979+
980+
for range requestTicker.C {
981+
if err := redisQueue.CleanupOldRequests(); err != nil {
982+
logging.Logger().Error().
983+
Err(err).
984+
Msg("Failed to cleanup old proof requests")
985+
} else {
986+
logging.Logger().Debug().Msg("Old proof requests cleanup completed")
987+
}
988+
}
989+
}()
990+
991+
// Start less frequent cleanup for old results (every 1 hour)
935992
go func() {
936-
ticker := time.NewTicker(1 * time.Hour)
937-
defer ticker.Stop()
993+
resultTicker := time.NewTicker(1 * time.Hour)
994+
defer resultTicker.Stop()
995+
996+
logging.Logger().Info().Msg("Started old results cleanup routine (every 1 hour)")
938997

939-
for range ticker.C {
998+
for range resultTicker.C {
940999
if err := redisQueue.CleanupOldResults(); err != nil {
9411000
logging.Logger().Error().
9421001
Err(err).
9431002
Msg("Failed to cleanup old results")
1003+
} else {
1004+
logging.Logger().Debug().Msg("Old results cleanup completed")
9441005
}
9451006
}
9461007
}()
9471008
}
1009+
1010+
// containsCircuit checks if the circuits slice contains the specified circuit
1011+
func containsCircuit(circuits []string, circuit string) bool {
1012+
for _, c := range circuits {
1013+
if c == circuit {
1014+
return true
1015+
}
1016+
}
1017+
return false
1018+
}

0 commit comments

Comments
 (0)