Skip to content

Commit 0026ecb

Browse files
jeremymanningclaude
andcommitted
Implement SGE job status checking support (issue #52)
- Added _check_sge_status method with comprehensive SGE state parsing - Supports all SGE job states: r (running), qw (queued), Eqw (error), dr (deleting) - Handles job completion detection via exit_status or job not found - Added SGE cancellation support to cancel_job method using qdel - Integrated SGE status checking into main _check_job_status method - Added comprehensive test suite with 6 new SGE-specific tests: - test_check_sge_status_running - test_check_sge_status_queued - test_check_sge_status_failed - test_check_sge_status_completed - test_check_sge_status_exit_status - test_cancel_job_sge - All 312 tests passing This completes SGE support implementation, moving it from "⚡ Nearly Ready" to "✅ Full Support" status with proper job monitoring and result collection. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 0af5672 commit 0026ecb

File tree

2 files changed

+152
-0
lines changed

2 files changed

+152
-0
lines changed

clustrix/executor.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,21 @@ def _check_job_status(self, job_id: str) -> str:
684684
else:
685685
return "completed"
686686

687+
elif self.config.cluster_type == "sge":
688+
sge_status = self._check_sge_status(job_id)
689+
if sge_status == "completed":
690+
# Job completed but not in queue, check if result exists
691+
if job_id in self.active_jobs:
692+
job_info = self.active_jobs[job_id]
693+
result_exists = self._remote_file_exists(
694+
f"{job_info['remote_dir']}/result.pkl"
695+
)
696+
return "completed" if result_exists else "failed"
697+
else:
698+
return "completed"
699+
else:
700+
return sge_status
701+
687702
elif self.config.cluster_type == "ssh":
688703
# For SSH jobs, check if result file exists
689704
if job_id in self.active_jobs:
@@ -992,6 +1007,8 @@ def cancel_job(self, job_id: str):
9921007
self._execute_remote_command(f"scancel {job_id}")
9931008
elif self.config.cluster_type == "pbs":
9941009
self._execute_remote_command(f"qdel {job_id}")
1010+
elif self.config.cluster_type == "sge":
1011+
self._execute_remote_command(f"qdel {job_id}")
9951012

9961013
if job_id in self.active_jobs:
9971014
del self.active_jobs[job_id]
@@ -1093,6 +1110,33 @@ def _check_pbs_status(self, job_id: str) -> str:
10931110
except Exception:
10941111
return "unknown"
10951112

1113+
def _check_sge_status(self, job_id: str) -> str:
1114+
"""Check SGE job status."""
1115+
cmd = f"qstat -j {job_id}"
1116+
try:
1117+
stdout, stderr = self._execute_remote_command(cmd)
1118+
if not stdout.strip() or "Following jobs do not exist" in stderr:
1119+
# Job not in queue, likely completed
1120+
return "completed"
1121+
else:
1122+
# Parse SGE job state from qstat output
1123+
# Common SGE states: r (running), qw (queued), Eqw (error), dr (deleting)
1124+
if "job_state r" in stdout:
1125+
return "running"
1126+
elif "job_state qw" in stdout:
1127+
return "queued"
1128+
elif "job_state Eqw" in stdout:
1129+
return "failed"
1130+
elif "job_state dr" in stdout:
1131+
return "completed"
1132+
# Check for exit status indicating completion
1133+
elif "exit_status" in stdout:
1134+
return "completed"
1135+
else:
1136+
return "running" # Default for unknown running states
1137+
except Exception:
1138+
return "unknown"
1139+
10961140
def _get_k8s_result(self, job_id: str) -> Any:
10971141
"""Get result from Kubernetes job logs."""
10981142
try:

tests/test_executor.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,91 @@ def test_check_pbs_status(self, executor):
365365

366366
assert status == "running"
367367

368+
def test_check_sge_status_running(self, executor):
369+
"""Test SGE job status checking - running state."""
370+
executor.ssh_client = Mock()
371+
372+
# Mock qstat -j output for running job
373+
mock_stdout = Mock()
374+
mock_stdout.read.return_value = b"job_state r"
375+
mock_stderr = Mock()
376+
mock_stderr.read.return_value = b""
377+
378+
executor.ssh_client.exec_command.return_value = (None, mock_stdout, mock_stderr)
379+
380+
status = executor._check_sge_status("12345")
381+
382+
assert status == "running"
383+
384+
# Verify qstat command
385+
call_args = executor.ssh_client.exec_command.call_args[0][0]
386+
assert "qstat -j" in call_args
387+
assert "12345" in call_args
388+
389+
def test_check_sge_status_queued(self, executor):
390+
"""Test SGE job status checking - queued state."""
391+
executor.ssh_client = Mock()
392+
393+
# Mock qstat -j output for queued job
394+
mock_stdout = Mock()
395+
mock_stdout.read.return_value = b"job_state qw"
396+
mock_stderr = Mock()
397+
mock_stderr.read.return_value = b""
398+
399+
executor.ssh_client.exec_command.return_value = (None, mock_stdout, mock_stderr)
400+
401+
status = executor._check_sge_status("12345")
402+
403+
assert status == "queued"
404+
405+
def test_check_sge_status_failed(self, executor):
406+
"""Test SGE job status checking - error state."""
407+
executor.ssh_client = Mock()
408+
409+
# Mock qstat -j output for error job
410+
mock_stdout = Mock()
411+
mock_stdout.read.return_value = b"job_state Eqw"
412+
mock_stderr = Mock()
413+
mock_stderr.read.return_value = b""
414+
415+
executor.ssh_client.exec_command.return_value = (None, mock_stdout, mock_stderr)
416+
417+
status = executor._check_sge_status("12345")
418+
419+
assert status == "failed"
420+
421+
def test_check_sge_status_completed(self, executor):
422+
"""Test SGE job status checking - completed/not found."""
423+
executor.ssh_client = Mock()
424+
425+
# Mock qstat -j output for job not found
426+
mock_stdout = Mock()
427+
mock_stdout.read.return_value = b""
428+
mock_stderr = Mock()
429+
mock_stderr.read.return_value = b"Following jobs do not exist: 12345"
430+
431+
executor.ssh_client.exec_command.return_value = (None, mock_stdout, mock_stderr)
432+
433+
status = executor._check_sge_status("12345")
434+
435+
assert status == "completed"
436+
437+
def test_check_sge_status_exit_status(self, executor):
438+
"""Test SGE job status checking - exit status indicates completion."""
439+
executor.ssh_client = Mock()
440+
441+
# Mock qstat -j output with exit status
442+
mock_stdout = Mock()
443+
mock_stdout.read.return_value = b"exit_status 0"
444+
mock_stderr = Mock()
445+
mock_stderr.read.return_value = b""
446+
447+
executor.ssh_client.exec_command.return_value = (None, mock_stdout, mock_stderr)
448+
449+
status = executor._check_sge_status("12345")
450+
451+
assert status == "completed"
452+
368453
def test_get_job_status_completed(self, executor):
369454
"""Test job status when result file exists."""
370455
executor.ssh_client = Mock()
@@ -485,6 +570,29 @@ def test_cancel_job_slurm(self, executor):
485570
call_args = executor.ssh_client.exec_command.call_args[0][0]
486571
assert "scancel 12345" in call_args
487572

573+
def test_cancel_job_sge(self, executor):
574+
"""Test canceling SGE job."""
575+
executor.ssh_client = Mock()
576+
executor.config.cluster_type = "sge"
577+
578+
mock_stdout = Mock()
579+
mock_stdout.read.return_value = b""
580+
mock_stdout.channel.recv_exit_status.return_value = 0
581+
582+
executor.ssh_client.exec_command.return_value = (None, mock_stdout, Mock())
583+
584+
# Add job to active jobs
585+
executor.active_jobs["12345"] = {"remote_dir": "/tmp/test_job"}
586+
587+
executor.cancel_job("12345")
588+
589+
# Verify qdel command was called
590+
call_args = executor.ssh_client.exec_command.call_args[0][0]
591+
assert "qdel 12345" in call_args
592+
593+
# Verify job was removed from active jobs
594+
assert "12345" not in executor.active_jobs
595+
488596
def test_get_error_log(self, executor):
489597
"""Test error log retrieval."""
490598
executor.active_jobs["failed_job"] = {"remote_dir": "/tmp/failed_job"}

0 commit comments

Comments
 (0)