Skip to content

Commit 991a851

Browse files
committed
Merge branch 'master' into lkchen-ray_data_llm
Signed-off-by: Linkun Chen <github@lkchen.net>
2 parents 8f9d4e6 + cb5e33f commit 991a851

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+677
-533
lines changed

.buildkite/others.rayci.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ steps:
1919
job_env: oss-ci-base_test-py3.11
2020
depends_on: oss-ci-base_test-multipy
2121

22+
- label: ":tapioca: build: uv pip compile LLM dependencies"
23+
key: uv_pip_compile_llm_dependencies
24+
instance_type: small
25+
command: ./ci/test_compile_llm_requirements.sh
26+
soft_fail: true
27+
job_env: oss-ci-base_test-py3.11
28+
depends_on: oss-ci-base_test-multipy
29+
2230
# docs
2331
- name: doctestbuild
2432
wanda: ci/docker/doctest.build.wanda.yaml

ci/compile_llm_requirements.sh

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,31 @@
33
set -euo pipefail
44

55
PYTHON_CODE="$(python -c "import sys; v=sys.version_info; print(f'py{v.major}{v.minor}')")"
6+
if [[ "${PYTHON_CODE}" != "py311" ]]; then
7+
echo "--- Python version is not 3.11"
8+
echo "--- Current Python version: ${PYTHON_CODE}"
9+
exit 1
10+
fi
611

712
for CUDA_CODE in cpu cu121 cu124 ; do
813
PYTHON_CUDA_CODE="${PYTHON_CODE}_${CUDA_CODE}"
914

1015
echo "--- Compile dependencies for ${PYTHON_CODE}_${CUDA_CODE}"
1116

12-
PIP_COMPILE=(
13-
pip-compile -v --generate-hashes --strip-extras
17+
UV_PIP_COMPILE=(
18+
uv pip compile -v --generate-hashes --strip-extras
1419
--unsafe-package ray
1520
# The version we use on python 3.9 is not installable on python 3.11
1621
--unsafe-package grpcio-tools
1722
# setuptools should not be pinned.
1823
--unsafe-package setuptools
24+
--index-url "https://pypi.org/simple"
1925
--extra-index-url "https://download.pytorch.org/whl/${CUDA_CODE}"
20-
--find-links "https://data.pyg.org/whl/torch-2.3.0+${CUDA_CODE}.html"
26+
--find-links "https://data.pyg.org/whl/torch-2.5.1+${CUDA_CODE}.html"
27+
--index-strategy unsafe-best-match
28+
--no-strip-markers
29+
--emit-index-url
30+
--emit-find-links
2131
)
2232

2333
mkdir -p /tmp/ray-deps
@@ -32,7 +42,7 @@ for CUDA_CODE in cpu cu121 cu124 ; do
3242
#
3343
# Needs to use the exact torch version.
3444
echo "--- Compile ray base test dependencies"
35-
"${PIP_COMPILE[@]}" \
45+
"${UV_PIP_COMPILE[@]}" \
3646
-c "/tmp/ray-deps/requirements_compiled.txt" \
3747
"python/requirements.txt" \
3848
"python/requirements/cloud-requirements.txt" \
@@ -41,7 +51,7 @@ for CUDA_CODE in cpu cu121 cu124 ; do
4151

4252
# Second, expand it into LLM test dependencies
4353
echo "--- Compile LLM test dependencies"
44-
"${PIP_COMPILE[@]}" \
54+
"${UV_PIP_COMPILE[@]}" \
4555
-c "python/requirements_compiled_ray_test_${PYTHON_CUDA_CODE}.txt" \
4656
"python/requirements.txt" \
4757
"python/requirements/cloud-requirements.txt" \
@@ -53,7 +63,7 @@ for CUDA_CODE in cpu cu121 cu124 ; do
5363
# Third, extract the ray base dependencies from ray base test dependencies.
5464
# TODO(aslonnie): This should be used for installing ray in the container images.
5565
echo "--- Compile ray base test dependencies"
56-
"${PIP_COMPILE[@]}" \
66+
"${UV_PIP_COMPILE[@]}" \
5767
-c "python/requirements_compiled_ray_test_${PYTHON_CUDA_CODE}.txt" \
5868
"python/requirements.txt" \
5969
-o "python/requirements_compiled_ray_${PYTHON_CUDA_CODE}.txt"
@@ -62,10 +72,11 @@ for CUDA_CODE in cpu cu121 cu124 ; do
6272
# which is also an expansion of the ray base dependencies.
6373
# TODO(aslonnie): This should be used for installing ray[llm] in the container images.
6474
echo "--- Compile LLM dependencies"
65-
"${PIP_COMPILE[@]}" \
75+
"${UV_PIP_COMPILE[@]}" \
6676
-c "python/requirements_compiled_rayllm_test_${PYTHON_CUDA_CODE}.txt" \
6777
"python/requirements.txt" \
6878
"python/requirements/llm/llm-requirements.txt" \
6979
-o "python/requirements_compiled_rayllm_${PYTHON_CUDA_CODE}.txt"
70-
7180
done
81+
82+
echo "--- Done"

ci/pipeline/determine_tests_to_run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
_ALL_TAGS = set(
1313
"""
14+
always
1415
lint python cpp core_cpp java workflow accelerated_dag dashboard
1516
data serve ml tune train llm rllib rllib_gpu rllib_directly
1617
linux_wheels macos_wheels docker doc python_dependencies tools
@@ -189,6 +190,7 @@ def match_tags(self, changed_file: str) -> Tuple[Set[str], bool]:
189190

190191
tags: Set[str] = set()
191192

193+
tags.add("always")
192194
tags.add("lint")
193195

194196
def _emit(line: str):

ci/pipeline/test_conditional_testing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ def __init__(self, file: str, tags: Set[str]):
153153
)
154154
tags = output.split()
155155

156-
want = test_case.tags
156+
want = set(list(test_case.tags) + ["always"])
157157
assert want == set(tags), f"file {test_case.file}, want {want}, got {tags}"
158158

159159

ci/test_compile_llm_requirements.sh

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/bin/bash
2+
3+
set -e
4+
5+
# Install uv and set up Python
6+
pip install uv
7+
uv python install 3.11
8+
uv python pin 3.11
9+
10+
# Create a temporary directory for backup files and setup cleanup trap
11+
TEMP_DIR=$(mktemp -d)
12+
cleanup() {
13+
echo "Cleaning up temporary directory: $TEMP_DIR"
14+
rm -rf "$TEMP_DIR"
15+
}
16+
trap cleanup EXIT
17+
18+
echo "Created temporary directory: $TEMP_DIR"
19+
20+
# Create backup copies of req files to reference to
21+
cp ./python/requirements_compiled_rayllm_py311_cpu.txt "$TEMP_DIR/requirements_compiled_rayllm_py311_cpu_backup.txt"
22+
cp ./python/requirements_compiled_rayllm_py311_cu121.txt "$TEMP_DIR/requirements_compiled_rayllm_py311_cu121_backup.txt"
23+
cp ./python/requirements_compiled_rayllm_py311_cu124.txt "$TEMP_DIR/requirements_compiled_rayllm_py311_cu124_backup.txt"
24+
25+
./ci/compile_llm_requirements.sh
26+
27+
# Copy files to artifact mount on Buildkite
28+
cp ./python/requirements_compiled_rayllm_py311_cpu.txt /artifact-mount/
29+
cp ./python/requirements_compiled_rayllm_py311_cu121.txt /artifact-mount/
30+
cp ./python/requirements_compiled_rayllm_py311_cu124.txt /artifact-mount/
31+
32+
# Check all files and print if files are not up to date
33+
FAILED=0
34+
for VARIANT in cpu cu121 cu124; do
35+
diff --color -u ./python/requirements_compiled_rayllm_py311_${VARIANT}.txt "$TEMP_DIR/requirements_compiled_rayllm_py311_${VARIANT}_backup.txt" || {
36+
echo "requirements_compiled_rayllm_py311_${VARIANT}.txt is not up to date. Please download it from Artifacts tab and git push the changes."
37+
FAILED=1
38+
}
39+
done
40+
if [[ $FAILED -eq 1 ]]; then
41+
exit 1
42+
fi

python/ray/_private/state_api_test_utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
PredicateType,
3030
SupportedFilterType,
3131
)
32+
import ray._private.test_utils as test_utils
3233

3334

3435
@dataclass
@@ -113,6 +114,16 @@ def invoke_state_api(
113114
return res
114115

115116

117+
def invoke_state_api_n(*args, **kwargs):
118+
def verify():
119+
NUM_API_CALL_SAMPLES = 10
120+
for _ in range(NUM_API_CALL_SAMPLES):
121+
invoke_state_api(*args, **kwargs)
122+
return True
123+
124+
test_utils.wait_for_condition(verify, retry_interval_ms=2000, timeout=30)
125+
126+
116127
def aggregate_perf_results(state_stats: StateAPIStats = GLOBAL_STATE_STATS):
117128
"""Aggregate stats of state API calls
118129

python/ray/data/_internal/datasource/databricks_uc_datasource.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ def __init__(
3737
self.schema = schema
3838
self.query = query
3939

40-
url_base = f"https://{self.host}/api/2.0/sql/statements/"
40+
if not host.startswith(("http://", "https://")):
41+
self.host = f"https://{host}"
42+
43+
url_base = f"{self.host}/api/2.0/sql/statements/"
4144

4245
payload = json.dumps(
4346
{

python/ray/data/_internal/planner/plan_udf_map_op.py

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ def _generate_transform_fn_for_map_batches(
376376
) -> MapTransformCallable[DataBatch, DataBatch]:
377377
if inspect.iscoroutinefunction(fn):
378378
# UDF is a callable class with async generator `__call__` method.
379-
transform_fn = _generate_transform_fn_for_async_map_batches(fn)
379+
transform_fn = _generate_transform_fn_for_async_map(fn, _validate_batch_output)
380380

381381
else:
382382

@@ -423,64 +423,66 @@ def transform_fn(
423423
return transform_fn
424424

425425

426-
def _generate_transform_fn_for_async_map_batches(
426+
def _generate_transform_fn_for_async_map(
427427
fn: UserDefinedFunction,
428-
) -> MapTransformCallable[DataBatch, DataBatch]:
429-
def transform_fn(
430-
input_iterable: Iterable[DataBatch], _: TaskContext
431-
) -> Iterable[DataBatch]:
428+
validate_fn,
429+
) -> MapTransformCallable:
430+
# Generates a transform function for asynchronous mapping of items (either batches or rows)
431+
# using a user-defined function (UDF). This consolidated function handles both asynchronous
432+
# batch processing and asynchronous flat mapping (e.g., rows) based on the provided UDF.
433+
def transform_fn(input_iterable: Iterable, _: TaskContext) -> Iterable:
432434
# Use a queue to store outputs from async generator calls.
433-
# We will put output batches into this queue from async
435+
# We will put output items into this queue from async
434436
# generators, and in the main event loop, yield them from
435437
# the queue as they become available.
436-
output_batch_queue = queue.Queue()
438+
output_item_queue = queue.Queue()
437439
# Sentinel object to signal the end of the async generator.
438440
sentinel = object()
439441

440-
async def process_batch(batch: DataBatch):
442+
async def process_item(item):
441443
try:
442-
output_batch_iterator = await fn(batch)
444+
output_item_iterator = await fn(item)
443445
# As soon as results become available from the async generator,
444446
# put them into the result queue so they can be yielded.
445-
async for output_batch in output_batch_iterator:
446-
output_batch_queue.put(output_batch)
447+
async for output_item in output_item_iterator:
448+
output_item_queue.put(output_item)
447449
except Exception as e:
448-
output_batch_queue.put(
450+
output_item_queue.put(
449451
e
450452
) # Put the exception into the queue to signal an error
451453

452-
async def process_all_batches():
454+
async def process_all_items():
453455
try:
454456
loop = ray.data._map_actor_context.udf_map_asyncio_loop
455-
tasks = [loop.create_task(process_batch(x)) for x in input_iterable]
457+
tasks = [loop.create_task(process_item(x)) for x in input_iterable]
456458

457459
ctx = ray.data.DataContext.get_current()
458460
if ctx.execution_options.preserve_order:
459461
for task in tasks:
460-
await task()
462+
await task
461463
else:
462464
for task in asyncio.as_completed(tasks):
463465
await task
464466
finally:
465-
output_batch_queue.put(sentinel)
467+
output_item_queue.put(sentinel)
466468

467-
# Use the existing event loop to create and run Tasks to process each batch
469+
# Use the existing event loop to create and run Tasks to process each item
468470
loop = ray.data._map_actor_context.udf_map_asyncio_loop
469-
asyncio.run_coroutine_threadsafe(process_all_batches(), loop)
471+
asyncio.run_coroutine_threadsafe(process_all_items(), loop)
470472

471473
# Yield results as they become available.
472474
while True:
473-
# Here, `out_batch` is a one-row output batch
475+
# Here, `out_item` is a one-row output item
474476
# from the async generator, corresponding to a
475-
# single row from the input batch.
476-
out_batch = output_batch_queue.get()
477-
if out_batch is sentinel:
477+
# single row from the input item.
478+
out_item = output_item_queue.get()
479+
if out_item is sentinel:
478480
# Break out of the loop when the sentinel is received.
479481
break
480-
if isinstance(out_batch, Exception):
481-
raise out_batch
482-
_validate_batch_output(out_batch)
483-
yield out_batch
482+
if isinstance(out_item, Exception):
483+
raise out_item
484+
validate_fn(out_item)
485+
yield out_item
484486

485487
return transform_fn
486488

@@ -511,11 +513,17 @@ def transform_fn(rows: Iterable[Row], _: TaskContext) -> Iterable[Row]:
511513
def _generate_transform_fn_for_flat_map(
512514
fn: UserDefinedFunction,
513515
) -> MapTransformCallable[Row, Row]:
514-
def transform_fn(rows: Iterable[Row], _: TaskContext) -> Iterable[Row]:
515-
for row in rows:
516-
for out_row in fn(row):
517-
_validate_row_output(out_row)
518-
yield out_row
516+
if inspect.iscoroutinefunction(fn):
517+
# UDF is a callable class with async generator `__call__` method.
518+
transform_fn = _generate_transform_fn_for_async_map(fn, _validate_row_output)
519+
520+
else:
521+
522+
def transform_fn(rows: Iterable[Row], _: TaskContext) -> Iterable[Row]:
523+
for row in rows:
524+
for out_row in fn(row):
525+
_validate_row_output(out_row)
526+
yield out_row
519527

520528
return transform_fn
521529

python/ray/data/dataset.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,12 +1373,6 @@ def repartition(
13731373
"""Repartition the :class:`Dataset` into exactly this number of
13741374
:ref:`blocks <dataset_concept>`.
13751375
1376-
When `target_num_rows_per_block` is set, it repartitions :class:`Dataset`
1377-
to honor target number of rows per :ref:`blocks <dataset_concept>`. Note
1378-
that the system will internally figure out the number of rows per
1379-
:ref:`blocks <dataset_concept>` for optimal execution, based on the
1380-
`target_num_rows_per_block`.
1381-
13821376
This method can be useful to tune the performance of your pipeline. To learn
13831377
more, see :ref:`Advanced: Performance Tips and Tuning <data_performance_tips>`.
13841378
@@ -1408,9 +1402,16 @@ def repartition(
14081402
14091403
Args:
14101404
num_blocks: Number of blocks after repartitioning.
1411-
target_num_rows_per_block: The target number of rows per block to
1405+
target_num_rows_per_block: [Experimental] The target number of rows per block to
14121406
repartition. Note that either `num_blocks` or
1413-
`target_num_rows_per_block` must be set, but not both.
1407+
`target_num_rows_per_block` must be set, but not both. When
1408+
`target_num_rows_per_block` is set, it only repartitions
1409+
:class:`Dataset` :ref:`blocks <dataset_concept>` that are larger than
1410+
`target_num_rows_per_block`. Note that the system will internally
1411+
figure out the number of rows per :ref:`blocks <dataset_concept>` for
1412+
optimal execution, based on the `target_num_rows_per_block`. This is
1413+
the current behavior because of the implementation and may change in
1414+
the future.
14141415
shuffle: Whether to perform a distributed shuffle during the
14151416
repartition. When shuffle is enabled, each output block
14161417
contains a subset of data rows from each input block, which

python/ray/data/random_access_dataset.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,10 @@ def multiget(self, block_indices, keys):
234234
col = block[self.key_field]
235235
indices = np.searchsorted(col, keys)
236236
acc = BlockAccessor.for_block(block)
237-
result = [acc._get_row(i) for i in indices]
238-
# assert result == [self._get(i, k) for i, k in zip(block_indices, keys)]
237+
result = [
238+
acc._get_row(i) if k1.as_py() == k2 else None
239+
for i, k1, k2 in zip(indices, col.take(indices), keys)
240+
]
239241
else:
240242
result = [self._get(i, k) for i, k in zip(block_indices, keys)]
241243
self.total_time += time.perf_counter() - start

0 commit comments

Comments
 (0)