Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 94 additions & 23 deletions benchmark/analysis-scripts/autogroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import warnings
import statistics


from tabulate import tabulate
from collections import defaultdict
from typing import Dict, Any, Optional, Tuple, List, Set, Union
from omegaconf import OmegaConf
from pathlib import Path


def parse_hydra_config(iteration_dir: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -152,17 +154,57 @@ def find_varying_parameters(all_configs: List[Dict[str, Any]]) -> Set[str]:
return varying


def find_multirun_dir(index: int = 0) -> str:
"""Find the Nth latest directory in multirun (0=most recent, 1=previous, etc.)"""
if not Path('multirun').exists():
raise FileNotFoundError("multirun directory not found")

# This ensures that alphabetical sorting will correctly find latest
sorted_subdirs = sorted(
[(f"{p.parent.name} {p.name.replace('-', ':')}", str(p)) for p in Path('multirun').glob("*/*/")],
key=lambda x: x[0],
reverse=True,
)

if not sorted_subdirs:
raise FileNotFoundError("No experiment directories found in multirun")

try:
return sorted_subdirs[index][1]
except IndexError:
raise IndexError(f"Index {index} is out of range. Only {len(sorted_subdirs)} experiment directories found.")


def main() -> None:
parser = argparse.ArgumentParser(description='Print benchmark throughput data automatically grouped')
parser.add_argument('--base-dir', required=True, help='Base directory containing benchmark results')

# Positional argument for base directory (optional)
parser.add_argument('base_dir', nargs='?', help='Base directory containing benchmark results')

parser.add_argument('--csv-output', help='Optional CSV file to write the results to')
parser.add_argument(
'--runs',
choices=['rep', 'all'],
help='Show run numbers in results (rep(resentative)=min/median/max, all=all runs)',
)
args = parser.parse_args()

# Determine the base directory to use
if args.base_dir:
base_dir = args.base_dir
else:
try:
base_dir = find_multirun_dir()
print(f"Using inferred base directory: {base_dir}")
except (IndexError, FileNotFoundError) as e:
print(f"Error: {e}")
return

# List to store all results
all_results = []

# Process all iteration directories
for root, dirs, files in os.walk(args.base_dir):
for root, dirs, files in os.walk(base_dir):
for dir_name in dirs:
if dir_name.isdigit():
iteration_path = os.path.join(root, dir_name)
Expand All @@ -184,30 +226,52 @@ def main() -> None:
grouped_results = defaultdict(list)
for config, throughput, iter_num in all_results:
key = tuple((param, str(config.get(param, 'N/A'))) for param in sorted(varying_params))
grouped_results[key].append(throughput)

# Aggregated results table
aggregated_headers = varying_params + [
"Count",
"Median (Gbps)",
"Std Dev (Gbps)",
"Min (Gbps)",
"Max (Gbps)",
]
aggregated_rows = []
for config_key, throughputs in grouped_results.items():
grouped_results[key].append((throughput, iter_num))

# Generate aggregated results table with optional Run Numbers column
results_headers = (
varying_params
+ (["Run Numbers"] if args.runs else [])
+ ["Count", "Max (Gbps)", "Median (Gbps)", "Min (Gbps)", "Std Dev (Gbps)"]
)

results_rows = []
for config_key, throughput_data in grouped_results.items():
throughputs, run_numbers = zip(*throughput_data)

row = []
for _, value in config_key:
row.append(value)

# Add run numbers column if requested
if args.runs:
sorted_by_throughput = sorted(zip(throughputs, run_numbers), reverse=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't zip(throughputs, run_numbers) equivalent to throughput_data?

if args.runs == "rep":
# Find min, max, and median run numbers based on throughput
min_run = sorted_by_throughput[-1][1]
max_run = sorted_by_throughput[0][1]
median_idx = len(sorted_by_throughput) // 2
median_run = sorted_by_throughput[median_idx][1]

selected_runs = [max_run, median_run, min_run]
# Remove duplicates while preserving order using dict.fromkeys()
# (works in python > 3.7)
unique_runs = list(dict.fromkeys(selected_runs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your previous approach was more readable 😅
Perhaps just move it to a function instead?


row.append(",".join(unique_runs))
else:
all_runs = [r for _, r in sorted_by_throughput]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this run_numbers?

row.append(",".join(all_runs))

row.append(len(throughputs))
row.append(f"{max(throughputs):.2f}")
row.append(f"{statistics.median(throughputs):.2f}")
row.append(f"{min(throughputs):.2f}")
if len(throughputs) > 1:
row.append(f"{statistics.stdev(throughputs):.2f}")
else:
row.append("N/A")
row.append(f"{min(throughputs):.2f}")
row.append(f"{max(throughputs):.2f}")
aggregated_rows.append(row)
results_rows.append(row)

# Custom sorting function for benchmark types
def benchmark_type_sort_key(value: str) -> int:
Expand All @@ -217,7 +281,7 @@ def benchmark_type_sort_key(value: str) -> int:
# Sort rows by all columns
def sort_key(row: List[str]) -> List[Union[int, float, str]]:
key_parts = []
for value, header in zip(row, aggregated_headers):
for value, header in zip(row, results_headers):
if header == 'benchmark_type':
# Use custom ordering for benchmark type
key_parts.append(benchmark_type_sort_key(value))
Expand All @@ -231,17 +295,24 @@ def sort_key(row: List[str]) -> List[Union[int, float, str]]:
key_parts.append(str(value))
return key_parts

aggregated_rows.sort(key=sort_key)
results_rows.sort(key=sort_key)

# Display results
if args.runs == "all":
print("\nResults Summary (with all run numbers):")
elif args.runs == "rep":
print("\nResults Summary (with representative run numbers):")
else:
print("\nResults Summary:")

print("\nResults Summary:")
print(tabulate(aggregated_rows, headers=aggregated_headers, tablefmt="grid"))
print(tabulate(results_rows, headers=results_headers, tablefmt="grid"))

# Write to CSV if requested
if args.csv_output:
with open(args.csv_output, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(aggregated_headers)
writer.writerows(aggregated_rows)
writer.writerow(results_headers)
writer.writerows(results_rows)
print(f"\nResults written to CSV: {args.csv_output}")


Expand Down
Loading