Skip to content

Commit f3900eb

Browse files
authored
Merge pull request #124 from experimaestro/feature/file-based-progress
Feature/file based progress
2 parents f45a079 + 3e8e2a5 commit f3900eb

File tree

7 files changed

+1603
-16
lines changed

7 files changed

+1603
-16
lines changed

src/experimaestro/cli/__init__.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,6 @@
2020
logging.basicConfig(level=logging.INFO)
2121

2222

23-
def pass_cfg(f):
24-
"""Pass configuration information"""
25-
26-
@click.pass_context
27-
def new_func(ctx, *args, **kwargs):
28-
return ctx.invoke(f, ctx.obj, *args, **kwargs)
29-
30-
return update_wrapper(new_func, f)
31-
32-
3323
def check_xp_path(ctx, self, path: Path):
3424
if not (path / ".__experimaestro__").is_file():
3525
cprint(f"{path} is not an experimaestro working directory", "red")
@@ -142,7 +132,6 @@ def diff(path: Path):
142132
"""Show the reason of the identifier change for a job"""
143133
from experimaestro.tools.jobs import load_job
144134
from experimaestro import Config
145-
from experimaestro.core.objects import ConfigWalkContext
146135

147136
_, job = load_job(path / "params.json", discard_id=False)
148137
_, new_job = load_job(path / "params.json")
@@ -290,6 +279,16 @@ def get_command(self, ctx, name):
290279
cli.add_command(Launchers("connectors", help="Connector specific commands"))
291280
cli.add_command(Launchers("tokens", help="Token specific commands"))
292281

282+
# Import and add progress commands
283+
from .progress import progress as progress_cli
284+
285+
cli.add_command(progress_cli)
286+
287+
# Import and add jobs commands
288+
from .jobs import jobs as jobs_cli
289+
290+
cli.add_command(jobs_cli)
291+
293292

294293
@cli.group()
295294
@click.option("--workdir", type=Path, default=None)

src/experimaestro/cli/progress.py

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
"""Simplified CLI commands for managing and viewing progress files"""
2+
3+
import time
4+
from datetime import datetime
5+
from pathlib import Path
6+
from typing import Optional, Dict
7+
8+
import click
9+
from termcolor import colored
10+
11+
try:
12+
from tqdm import tqdm
13+
14+
TQDM_AVAILABLE = True
15+
except ImportError:
16+
TQDM_AVAILABLE = False
17+
18+
from experimaestro.progress import ProgressEntry, ProgressFileReader
19+
from experimaestro.settings import find_workspace
20+
from . import cli
21+
22+
23+
@click.option("--workspace", default="", help="Experimaestro workspace")
24+
@click.option("--workdir", type=Path, default=None)
25+
@cli.group()
26+
@click.pass_context
27+
def progress(
28+
ctx,
29+
workdir: Optional[Path],
30+
workspace: Optional[str],
31+
):
32+
"""Progress tracking commands"""
33+
ctx.obj.workspace = find_workspace(workdir=workdir, workspace=workspace)
34+
35+
36+
def format_timestamp(timestamp: float) -> str:
37+
"""Format timestamp for display"""
38+
dt = datetime.fromtimestamp(timestamp)
39+
return dt.strftime("%Y-%m-%d %H:%M:%S")
40+
41+
42+
@click.argument("jobid", type=str)
43+
@progress.command()
44+
@click.pass_context
45+
def show(ctx, jobid: str):
46+
"""Show current progress state (default command)
47+
48+
JOBID format: task_name/task_hash
49+
"""
50+
try:
51+
task_name, task_hash = jobid.split("/")
52+
except ValueError:
53+
raise click.ClickException("JOBID must be in format task_name/task_hash")
54+
55+
workspace = ctx.obj.workspace
56+
task_path = workspace.path / "jobs" / task_name / task_hash
57+
58+
if not task_path.exists():
59+
raise click.ClickException(f"Job directory not found: {task_path}")
60+
61+
reader = ProgressFileReader(task_path)
62+
current_progress = reader.get_current_progress()
63+
64+
if not current_progress:
65+
click.echo("No progress information available")
66+
return
67+
68+
# Filter out EOJ markers
69+
current_progress = {k: v for k, v in current_progress.items() if k != -1}
70+
71+
if not current_progress:
72+
click.echo("No progress information available")
73+
return
74+
75+
click.echo(f"Progress for job {jobid}")
76+
click.echo("=" * 80)
77+
78+
# Show simple text-based progress for each level
79+
for level in sorted(current_progress.keys()):
80+
entry = current_progress[level]
81+
indent = " " * level
82+
progress_pct = f"{entry.progress * 100:5.1f}%"
83+
desc = entry.desc or f"Level {level}"
84+
timestamp = format_timestamp(entry.timestamp)
85+
86+
color = "green" if entry.progress >= 1.0 else "yellow"
87+
click.echo(colored(f"{indent}L{level}: {progress_pct} - {desc}", color))
88+
click.echo(colored(f"{indent} Last updated: {timestamp}", "cyan"))
89+
90+
91+
def create_progress_bar(
92+
level: int,
93+
desc: str,
94+
progress: float = 0.0,
95+
) -> tqdm:
96+
"""Create a properly aligned progress bar like dashboard style"""
97+
if level > 0:
98+
indent = " " * (level - 1) + "└─ "
99+
else:
100+
indent = ""
101+
label = f"{indent}L{level}"
102+
103+
colors = ["blue", "yellow", "magenta", "cyan", "white"]
104+
bar_color = colors[level % len(colors)]
105+
106+
unit = desc[:50] if desc else f"Level {level}"
107+
ncols = 100
108+
wbar = 50
109+
110+
return tqdm(
111+
total=100,
112+
desc=label,
113+
position=level,
114+
leave=True,
115+
bar_format=f"{{desc}}: {{percentage:3.0f}}%|{{bar:{wbar - len(indent)}}}| {{unit}}", # noqa: F541
116+
ncols=ncols, # Adjust width based on level
117+
unit=unit,
118+
colour=bar_color,
119+
initial=progress * 100,
120+
)
121+
122+
123+
def _update_progress_display(
124+
reader: ProgressFileReader, progress_bars: Dict[int, tqdm]
125+
) -> bool:
126+
"""Update the tqdm progress bars in dashboard style"""
127+
current_state: Dict[int, ProgressEntry] = {
128+
k: v for k, v in reader.get_current_state().items() if k != -1
129+
}
130+
131+
if not current_state:
132+
click.echo("No progress information available yet...")
133+
return False
134+
135+
# Update existing bars and create new ones
136+
for _level, entry in current_state.items():
137+
progress_val = entry.progress * 100
138+
desc = entry.desc or f"Level {entry.level}"
139+
140+
if entry.level not in progress_bars:
141+
progress_bars[entry.level] = create_progress_bar(
142+
entry.level, desc, progress_val
143+
)
144+
145+
bar = progress_bars[entry.level]
146+
bar.unit = desc[:50]
147+
bar.n = progress_val
148+
149+
bar.refresh()
150+
151+
# Remove bars for levels that no longer exist
152+
levels_to_remove = set(progress_bars.keys()) - set(current_state.keys())
153+
for level in levels_to_remove:
154+
progress_bars[level].close()
155+
del progress_bars[level]
156+
157+
return True
158+
159+
160+
@click.argument("jobid", type=str)
161+
@click.option("--refresh-rate", "-r", default=0.5, help="Refresh rate in seconds")
162+
@progress.command()
163+
@click.pass_context
164+
def live(ctx, jobid: str, refresh_rate: float):
165+
"""Show live progress with tqdm-style bars
166+
167+
JOBID format: task_name/task_hash
168+
"""
169+
if not TQDM_AVAILABLE:
170+
click.echo("tqdm is not available. Install with: pip install tqdm")
171+
click.echo("Falling back to basic display...")
172+
ctx.invoke(show, jobid=jobid)
173+
return
174+
175+
try:
176+
task_name, task_hash = jobid.split("/")
177+
except ValueError:
178+
raise click.ClickException("JOBID must be in format task_name/task_hash")
179+
180+
workspace = ctx.obj.workspace
181+
task_path = workspace.path / "jobs" / task_name / task_hash
182+
183+
if not task_path.exists():
184+
raise click.ClickException(f"Job directory not found: {task_path}")
185+
186+
reader = ProgressFileReader(task_path)
187+
progress_bars: Dict[int, tqdm] = {}
188+
189+
def cleanup_bars():
190+
"""Clean up all progress bars"""
191+
for bar in progress_bars.values():
192+
bar.close()
193+
progress_bars.clear()
194+
195+
click.echo(f"Live progress for job {jobid}")
196+
click.echo("Press Ctrl+C to stop")
197+
click.echo("=" * 80)
198+
199+
try:
200+
if not _update_progress_display(reader, progress_bars):
201+
click.echo("No progress information available yet...")
202+
203+
while True:
204+
time.sleep(refresh_rate)
205+
206+
if not _update_progress_display(reader, progress_bars):
207+
# Check if job is complete
208+
if reader.is_done():
209+
click.echo("\nJob completed!")
210+
break
211+
212+
# Check if all progress bars are at 100%
213+
if progress_bars and all(bar.n >= 100 for bar in progress_bars.values()):
214+
cleanup_bars()
215+
click.echo("\nAll progress completed!")
216+
break
217+
218+
except KeyboardInterrupt:
219+
click.echo("\nStopped monitoring progress")
220+
finally:
221+
cleanup_bars()
222+
223+
224+
@progress.command(name="list")
225+
@click.pass_context
226+
def list_jobs(ctx):
227+
"""List all jobs with progress information"""
228+
ws = ctx.obj.workspace
229+
jobs_path = ws.path / "jobs"
230+
231+
if not jobs_path.exists():
232+
click.echo("No jobs directory found")
233+
return
234+
235+
for task_dir in jobs_path.iterdir():
236+
if not task_dir.is_dir():
237+
continue
238+
239+
for job_dir in task_dir.iterdir():
240+
if not job_dir.is_dir():
241+
continue
242+
243+
progress_dir = job_dir / ".experimaestro"
244+
if not progress_dir.exists():
245+
continue
246+
247+
# Check if there are progress files
248+
progress_files = list(progress_dir.glob("progress-*.jsonl"))
249+
if not progress_files:
250+
continue
251+
252+
job_id = f"{task_dir.name}/{job_dir.name}"
253+
reader = ProgressFileReader(job_dir)
254+
current_state = reader.get_current_state()
255+
256+
# if current_progress:
257+
if current_state:
258+
# Get overall progress (level 0)
259+
level_0 = current_state.get(0)
260+
if level_0:
261+
color = "green" if level_0.progress >= 1.0 else "yellow"
262+
desc = f"{level_0.desc}" if level_0.desc else ""
263+
progress_pct = f"{level_0.progress * 100:5.1f}%"
264+
click.echo(colored(f"{job_id:50} - {progress_pct} - {desc}", color))
265+
266+
else:
267+
click.echo(f"{job_id:50} No level 0 progress")
268+
else:
269+
click.echo(f"{job_id:50} No progress data")

src/experimaestro/notifications.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from .utils import logger
1414
from experimaestro.taskglobals import Env as TaskEnv
15+
from .progress import FileBasedProgressReporter
1516

1617
# --- Progress and other notifications
1718

@@ -41,7 +42,7 @@ def report(self):
4142
return result
4243

4344
def __repr__(self) -> str:
44-
return f"[{self.level}] {self.desc} {int(self.progress*1000)/10}%"
45+
return f"[{self.level}] {self.desc} {int(self.progress * 1000) / 10}%"
4546

4647

4748
class ListenerInformation:
@@ -79,10 +80,14 @@ def __init__(self, path: Path):
7980
self.progress_threshold = 0.01
8081
self.cv = threading.Condition()
8182

83+
# File-based progress reporter
84+
self.file_reporter = FileBasedProgressReporter(task_path=path)
85+
8286
def stop(self):
8387
self.stopping = True
8488
with self.cv:
85-
self.cv.notifyAll()
89+
# self.cv.notifyAll()
90+
self.cv.notify_all()
8691

8792
@staticmethod
8893
def isfatal_httperror(e: Exception, info: ListenerInformation) -> bool:
@@ -186,14 +191,16 @@ def eoj(self):
186191
try:
187192
with urlopen(url) as _:
188193
logger.debug(
189-
"EOJ botification sent for %s",
194+
"EOJ notification sent for %s",
190195
baseurl,
191196
)
192197
except Exception:
193198
logger.warning(
194199
"Could not report EOJ",
195200
)
196201

202+
self.file_reporter.eoj()
203+
197204
def set_progress(
198205
self, progress: float, level: int, desc: Optional[str], console=False
199206
):
@@ -212,6 +219,8 @@ def set_progress(
212219
self.levels[level].desc = desc
213220
self.levels[level].progress = progress
214221

222+
self.file_reporter.set_progress(progress, level, desc)
223+
215224
self.cv.notify_all()
216225

217226
INSTANCE: ClassVar[Optional["Reporter"]] = None

0 commit comments

Comments
 (0)