Skip to content

[CI] Tests for graph comparison between vllm and AFTU #286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
markers: "cpu and cb"
flags: "--timeout=300 --forked"
- name: "worker and utils"
markers: "not e2e"
markers: "not e2e and not aftu"
flags: "--timeout=300"

name: "${{ matrix.test_suite.name }} (${{ matrix.vllm_version.name }})"
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ markers = [
"multi: Tests that require >1 cards",
"utils: Tests for utility functions",
"worker: Tests for worker logic",
"aftu: Tests to compare graphs from aiu-fms-testing-utils",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to avoid creating more custom markers unless it's completely necessary. (Unrelated but it looks like utils and worker are unused and we should delete them as well)

These tests seem to be important to catch problems early so I do want them running with our default set of markers if possible

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. My intention with that was to easily remove the test when we do not have the aftu and torch_sendnn installed in the environment, for instance, we cannot run these tests in GH actions.
What should I do? Let the pytest collect these tests and check in the test if everything is setup to run, and otherwise skip them?

]
# --8<-- [end:test-markers-definition]

Expand All @@ -150,6 +151,7 @@ dev = [
"pytest-timeout==2.3.1",
"requests==2.32.3",
"sentence-transformers==3.4.1",
"aiu_fms_testing_utils@git+https://github.com/foundation-model-stack/aiu-fms-testing-utils.git#1a77f630104a5661fff554164c1e536ea08393e3"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we need to update the uv.lock file as well 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

]
lint = [
"clang-format==18.1.5",
Expand Down
160 changes: 160 additions & 0 deletions tests/aftu/graph_compare_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import difflib
import os
import re
import shutil
import tempfile
from collections.abc import Iterator
from glob import iglob
from os import path
from subprocess import PIPE, Popen
from typing import Optional

from vllm.model_executor.model_loader.weight_utils import (
download_weights_from_hf)


def load_graph_to_compare(file_path):
with open(file_path) as file:
content = file.read()

# Replace id: <number> with id: ###
content = re.sub(r'id: \d+', 'id: ###', content)

# Replace ptr: <pointer> with ptr: xxxx
content = re.sub(r'ptr: 0x[0-9a-fA-F]{12}', 'ptr: xxxx', content)

# Replace value
content = re.sub(r'values: ([0-9a-fA-F]{2}\s*)+', 'values: $$', content)

# Silly regex to find all s#.
# We are only considered those that surrounds by space (whole word)
# or started with space and terminated with comma
# examples:
# ' s1 '
# ' s1,'
# ' s1 s2 '
matched_symbols = re.findall(r'\s*(s\d+)[\s|,]', content)

symbols_set = set([m for m in matched_symbols])

# reindex symbols, considering the sorted indices

sorted_symbols = sorted(list(symbols_set))
symbol_map = {i: s for i, s in enumerate(sorted_symbols)}

for i, s in symbol_map.items():
content = content.replace(s, f'S#{i}')

return content


def collect_graph_files(input_dir: str) -> dict[str, tuple[str, str]]:
# Get G1 graphs, it assumes the input_dir has the folder export_dtcompiler
# where are the files

filepaths = iglob(path.join(input_dir, "export_dtcompiler", "*/*.ops"))

# Filter out G2 files
filepaths = [f for f in filepaths if not f.endswith(".g2.ops")]

# NOTE: f.split("dump")[-1], split the filename by using dump,
# to get numeric part which is the last one
filemap = { f.split("dump")[-1]: (f, load_graph_to_compare(f)) \
for f in filepaths}

return filemap


def diff_graph(a_filepath, a_file, b_filepath, b_file) -> Iterator[str]:
return difflib.unified_diff(a_file.split("\n"),
b_file.split("\n"),
fromfile=a_filepath,
tofile=b_filepath)


def get_aftu_script_dir() -> str:
# TODO: since AFTU is not a lib yet, this function does the best
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does AFTU is not a lib yet mean here? Is the problem that inference.py is not shipped with the AFTU wheel?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I think I saw somewhere that someone said something like "aftu it is not lib" (or a package that we can use everything just by pip installing), and that stuck in my head, but maybe I misread that.

Anyway, the last time I checked the inference.py is still not included by installing using the git url, so the code and comment is still valid, I guess.

# effort to get the scripts dir with inference.py to run the tests
# for graph comparison. The env variable below is a way to set it
# explicitly which is less error-prone.

script_dir = os.environ.get("VLLM_SPYRE_TEST_AFTU_SCRIPTS_DIR", "")

if script_dir:
return script_dir

# Let's look for it... assuming it is installed as local,
# i.e. git clone ... && uv pip install -e . [--no-deps]
import aiu_fms_testing_utils
module_dir = path.dirname(aiu_fms_testing_utils.__file__)
repo_dir = path.dirname(module_dir)

# Make sure it is the repo dir name
assert path.basename(repo_dir) == "aiu-fms-testing-utils"

return os.path.join(repo_dir, "scripts")


def compare_graphs(a_map: dict[str, tuple[str, str]],
b_map: dict[str, tuple[str, str]]) -> bool:

are_graphs_similar = True
for k, a_graph in a_map.items():
a_filename, a_filedata = a_graph
b_filename, b_filedata = b_map[k]

diff = diff_graph(a_filename, a_filedata, b_filename, b_filedata)
diff = list(diff)
if diff:
print("Found difference!", a_filename, b_filename)
lines_count = len(diff)
for line in diff[:20]:
print(line)
if (lines_count > 20):
print(f"[...] Omitted {lines_count - 20} lines")
are_graphs_similar = False

return are_graphs_similar


def run_inference_py_and_get_graphs(
inference_py_args: list[str],
extra_env: Optional[dict[str,
str]] = None) -> dict[str, tuple[str, str]]:
with tempfile.TemporaryDirectory() as tmpdir:

env = os.environ.copy()
env.update({
"DEE_DUMP_GRAPHS": "aftu",
"TORCH_SENDNN_CACHE_ENABLE": "0"
})
if extra_env:
env.update(extra_env)
# Copy scripts
script_dir = get_aftu_script_dir()
shutil.copytree(script_dir, os.path.join(tmpdir, "scripts"))

process = Popen(inference_py_args,
stdout=PIPE,
stderr=PIPE,
env=env,
cwd=tmpdir)

process.communicate()

aftu_graphs = collect_graph_files(tmpdir)

return aftu_graphs


def get_model_path(model_name_or_path):
is_local = os.path.isdir(model_name_or_path)
model_path = model_name_or_path
# Get location of model from HF cache.
if not is_local:
model_path = download_weights_from_hf(
model_name_or_path=model_path,
cache_dir=None,
allow_patterns=["*.safetensors", "*.bin", "*.pt"])

return model_path
176 changes: 176 additions & 0 deletions tests/aftu/test_compare_graphs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Compare graphs generated by vLLM-Spyre vs AFTU

Run `python -m pytest tests/aftu/test_compare_graphs.py`.
"""

import os
import sys
import tempfile

import pytest
from graph_compare_utils import (collect_graph_files, compare_graphs,
get_model_path,
run_inference_py_and_get_graphs)
from spyre_util import (generate_spyre_vllm_output, get_chicken_soup_prompts,
get_spyre_model_list)
from vllm import SamplingParams


@pytest.mark.aftu
@pytest.mark.parametrize("model", get_spyre_model_list())
@pytest.mark.parametrize("backend", ["sendnn"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this test can only run on the sendnn backend then we don't need to parameterize it, we can just set the backend as sendnn in the test.

Also, we should mark all tests running with the sendnn backend with pytest.mark.spyre for consistency with the rest of the test suite

@pytest.mark.parametrize("max_num_seqs", [2, 4],
ids=lambda val: f"max_num_seqs({val})")
def test_compare_graphs_cb(
model: str,
backend: str,
max_num_seqs: int,
monkeypatch: pytest.MonkeyPatch,
runtime_xfail,
):
"""Test that the spyre worker correctly outputs
continuous batches of requests by comparing to HF"""

if max_num_seqs > 2 and backend == "sendnn":
runtime_xfail("CB failures expected for batch size > 2")

# AFTU
max_model_len = 256
model_path = get_model_path(model)

inference_py_args = [
sys.executable, "scripts/inference.py", "--architecture",
"hf_configured", "--model_path", model_path, "--variant", model_path,
"--tokenizer", model_path, "--unfuse_weights", "--model_source", "hf",
"--device_type", "aiu", "--compile", "--default_dtype", "fp16",
"--compile_dynamic", "--min_pad_length", "64", "--max_new_tokens", "5",
"--batch_size",
str(max_num_seqs), "--compile_dynamic_sendnn", "--attention_type=paged"
]

extra_env = {
"VLLM_DT_MAX_CONTEXT_LEN": str(max_model_len),
"VLLM_DT_MAX_BATCH_SIZE": str(max_num_seqs)
}
aftu_graphs = run_inference_py_and_get_graphs(inference_py_args, extra_env)

# VLLM
prompts = get_chicken_soup_prompts(4)

max_new_tokens = 20

monkeypatch.setenv("DEE_DUMP_GRAPHS", "vllm_static")
# Disable cache to produce the graphs
monkeypatch.setenv("TORCH_SENDNN_CACHE_ENABLE", "0")
vllm_sampling_params = SamplingParams(
max_tokens=max_new_tokens,
temperature=0,
logprobs=0, # return logprobs of generated tokens only
ignore_eos=True)

original_cwd = os.getcwd()
try:
# Change to temp dir to set the test environment clean
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)

generate_spyre_vllm_output(model=model,
prompts=prompts,
max_model_len=max_model_len,
block_size=256,
sampling_params=vllm_sampling_params,
tensor_parallel_size=1,
backend=backend,
max_num_seqs=max_num_seqs,
use_cb=True,
monkeypatch=monkeypatch)

vllm_graphs = collect_graph_files(tmpdir)
finally:
# Restore in case of exception
os.chdir(original_cwd)

assert compare_graphs(vllm_graphs, aftu_graphs)


@pytest.mark.aftu
@pytest.mark.parametrize("model", get_spyre_model_list())
@pytest.mark.parametrize("warmup_shape",
[(64, 5, 1), (64, 5, 2),
(64, 5, 4)]) # (prompt_length/new_tokens/batch_size)
@pytest.mark.parametrize("backend", ["sendnn"])
def test_compare_graphs_static_batching(
model: str,
warmup_shape: tuple[int, int, int],
backend: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:

# AFTU
model_path = get_model_path(model)

inference_py_args = [
sys.executable,
"scripts/inference.py",
"--architecture",
"hf_configured",
"--model_path",
model_path,
"--variant",
model_path,
"--tokenizer",
model_path,
"--unfuse_weights",
"--model_source",
"hf",
"--device_type",
"aiu",
"--compile",
"--default_dtype",
"fp16",
"--compile_dynamic",
"--min_pad_length",
"64",
"--max_new_tokens",
str(warmup_shape[1]),
"--batch_size",
str(warmup_shape[2]),
]

aftu_graphs = run_inference_py_and_get_graphs(inference_py_args)

# VLLM
prompts = get_chicken_soup_prompts(4)

max_new_tokens = warmup_shape[1]

monkeypatch.setenv("DEE_DUMP_GRAPHS", "vllm_static")
# Disable cache to produce the graphs
monkeypatch.setenv("TORCH_SENDNN_CACHE_ENABLE", "0")
vllm_sampling_params = SamplingParams(max_tokens=max_new_tokens,
temperature=0,
logprobs=0,
ignore_eos=True)

original_cwd = os.getcwd()
try:
# Change to temp dir to set the test environment clean
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)

generate_spyre_vllm_output(model=model,
prompts=prompts,
warmup_shapes=[warmup_shape],
max_model_len=2048,
block_size=2048,
sampling_params=vllm_sampling_params,
tensor_parallel_size=1,
backend=backend,
monkeypatch=monkeypatch)

vllm_graphs = collect_graph_files(tmpdir)
finally:
# Restore in case of exception
os.chdir(original_cwd)

assert compare_graphs(vllm_graphs, aftu_graphs)
Loading