Skip to content

Commit 9f0bf34

Browse files
authored
Vineyard: from unordered chunks. (#3324)
* Vineyard: from unordered chunks. Signed-off-by: Tao He <sighingnow@gmail.com> * Skip coverage check for those guard code. Signed-off-by: Tao He <sighingnow@gmail.com> Signed-off-by: Tao He <sighingnow@gmail.com>
1 parent 181fe51 commit 9f0bf34

File tree

2 files changed

+54
-16
lines changed

2 files changed

+54
-16
lines changed

mars/dataframe/datasource/from_vineyard.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ def execute(cls, ctx, op):
142142
dtypes.append(dtype)
143143
dtypes = pd.Series(dtypes, index=columns)
144144
chunk_index = (
145-
chunk_meta["partition_index_row_"],
146-
chunk_meta["partition_index_column_"],
145+
chunk_meta.get("partition_index_row_", -1),
146+
chunk_meta.get("partition_index_column_", -1),
147147
)
148148
# chunk: (chunk_id, worker_address, dtype, shape, index, columns)
149149
chunks.append(
@@ -173,7 +173,13 @@ def __init__(self, vineyard_socket=None, object_id=None, **kw):
173173
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw)
174174

175175
def __call__(self, meta):
176-
return self.new_dataframe([meta])
176+
return self.new_dataframe(
177+
[meta],
178+
shape=meta.shape,
179+
dtypes=meta.dtypes,
180+
index_value=meta.index_value,
181+
columns_value=meta.columns_value,
182+
)
177183

178184
@classmethod
179185
def tile(cls, op):
@@ -182,21 +188,37 @@ def tile(cls, op):
182188

183189
ctx = get_context()
184190

185-
in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
186191
out_chunks = []
187192
chunk_map = dict()
188193
dtypes, columns = None, None
189-
for chunk, infos in zip(
190-
op.inputs[0].chunks, ctx.get_chunks_result(in_chunk_keys)
191-
):
194+
195+
in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
196+
in_chunk_results = ctx.get_chunks_result(in_chunk_keys)
197+
198+
# check if chunk indexes has unknown value
199+
has_unknown_chunk_index = False
200+
for infos in in_chunk_results:
201+
for _, info in infos.iterrows(): # pragma: no cover
202+
if len(info["index"]) == 0 or -1 in info["index"]:
203+
has_unknown_chunk_index = True
204+
break
205+
206+
# assume chunks are row-splitted if chunk index is unknown
207+
chunk_location = 0
208+
209+
for chunk, infos in zip(op.inputs[0].chunks, in_chunk_results):
192210
for _, info in infos.iterrows():
193211
chunk_op = op.copy().reset_key()
194212
chunk_op.object_id = info["id"]
195213
chunk_op.expect_worker = info["worker_address"]
196214
dtypes = info["dtypes"]
197215
columns = info["columns"]
198216
shape = info["shape"]
199-
chunk_index = info["index"]
217+
if has_unknown_chunk_index: # pragma: no cover
218+
chunk_index = (chunk_location, 0)
219+
chunk_location += 1
220+
else:
221+
chunk_index = info["index"]
200222
chunk_map[chunk_index] = info["shape"]
201223
out_chunk = chunk_op.new_chunk(
202224
[chunk],
@@ -251,7 +273,7 @@ def from_vineyard(df, vineyard_socket=None):
251273
gpu=None,
252274
)
253275
meta = metaop(
254-
shape=(np.nan,),
276+
shape=(np.nan, np.nan),
255277
dtypes=pd.Series([]),
256278
index_value=parse_index(pd.Index([])),
257279
columns_value=parse_index(pd.Index([])),

mars/tensor/datasource/from_vineyard.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def execute(cls, ctx, op):
9595
chunk_meta["value_type_"], chunk_meta.get("value_type_meta_", None)
9696
)
9797
shape = tuple(json.loads(chunk_meta["shape_"]))
98-
chunk_index = tuple(json.loads(chunk_meta["partition_index_"]))
98+
chunk_index = tuple(json.loads(chunk_meta.get("partition_index_", "[]")))
9999
# chunk: (chunk_id, worker_address, dtype, shape, index)
100100
chunks.append(
101101
(repr(chunk_meta.id), ctx.worker_address, dtype, shape, chunk_index)
@@ -119,7 +119,7 @@ def __init__(self, vineyard_socket=None, object_id=None, **kw):
119119
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw)
120120

121121
def __call__(self, meta):
122-
return self.new_tensor([meta], shape=(np.nan,))
122+
return self.new_tensor([meta], shape=meta.shape, dtype=meta.dtype)
123123

124124
@classmethod
125125
def tile(cls, op):
@@ -128,20 +128,35 @@ def tile(cls, op):
128128

129129
ctx = get_context()
130130

131-
in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
132131
out_chunks = []
133132
chunk_map = dict()
134133
dtype = None
135-
for chunk, infos in zip(
136-
op.inputs[0].chunks, ctx.get_chunks_result(in_chunk_keys)
137-
):
134+
in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
135+
in_chunk_results = ctx.get_chunks_result(in_chunk_keys)
136+
137+
# check if chunk indexes has unknown value
138+
has_unknown_chunk_index = False
139+
for infos in in_chunk_results:
140+
for info in infos[0]: # pragma: no cover
141+
if len(info[4]) == 0 or -1 in info[4]:
142+
has_unknown_chunk_index = True
143+
break
144+
145+
# assume chunks are row-splitted if chunk index is unknown
146+
chunk_location = 0
147+
148+
for chunk, infos in zip(op.inputs[0].chunks, in_chunk_results):
138149
for info in infos[0]: # n.b. 1-element ndarray
139150
chunk_op = op.copy().reset_key()
140151
chunk_op.object_id = info[0]
141152
chunk_op.expect_worker = info[1]
142153
dtype = info[2]
143154
shape = info[3]
144-
chunk_index = info[4]
155+
if has_unknown_chunk_index: # pragma: no cover
156+
chunk_index = (chunk_location,)
157+
chunk_location += 1
158+
else:
159+
chunk_index = info[4]
145160
chunk_map[chunk_index] = info[3]
146161
out_chunk = chunk_op.new_chunk(
147162
[chunk], shape=shape, dtype=dtype, index=chunk_index
@@ -181,6 +196,7 @@ def fromvineyard(tensor, vineyard_socket=None):
181196
metaop = TensorFromVineyard(
182197
vineyard_socket=vineyard_socket,
183198
object_id=object_id,
199+
shape=(np.nan,),
184200
dtype=np.dtype("byte"),
185201
gpu=None,
186202
)

0 commit comments

Comments
 (0)