Skip to content

Commit cd22c4c

Browse files
fyrestone刘宝
andauthored
Optimize serializable memory (#3120)
* Optimize serializable memory * Fix lint * Fix * Optimize * Rename attr_name to name of Field * Fix * Fix * Fix benchmark * Add asv bench * Optimize copy * Improve coverage * Fix Co-authored-by: 刘宝 <po.lb@antfin.com>
1 parent 18a3af8 commit cd22c4c

File tree

10 files changed

+202
-165
lines changed

10 files changed

+202
-165
lines changed

benchmarks/asv_bench/benchmarks/graph_assigner.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import random
16+
import tracemalloc
1617

1718
import mars.tensor as mt
1819
import mars.dataframe as md
@@ -30,6 +31,7 @@ class ChunkGraphAssignerSuite:
3031
repeat = 10
3132

3233
def setup(self):
34+
tracemalloc.start()
3335
random.seed()
3436

3537
num_rows = 10000
@@ -45,6 +47,8 @@ def setup(self):
4547
graph = TileableGraph([merged_df.data])
4648
next(TileableGraphBuilder(graph).build())
4749
self.chunk_graph = next(ChunkGraphBuilder(graph, fuse_enabled=False).build())
50+
self.mem_size, self.mem_peak = tracemalloc.get_traced_memory()
51+
tracemalloc.stop()
4852

4953
def time_assigner(self):
5054
start_ops = list(GraphAnalyzer._iter_start_ops(self.chunk_graph))
@@ -55,3 +59,26 @@ def time_assigner(self):
5559
assigner = GraphAssigner(self.chunk_graph, start_ops, band_resource)
5660
assigned_result = assigner.assign(current_assign)
5761
assert len(assigned_result) == len(start_ops)
62+
63+
def peakmem_setup(self):
64+
"""peakmem includes the memory used by setup.
65+
Peakmem benchmarks measure the maximum amount of RAM used by a
66+
function. However, this maximum also includes the memory used
67+
by ``setup`` (as of asv 0.2.1; see [1]_)
68+
Measuring an empty peakmem function might allow us to disambiguate
69+
between the memory used by setup and the memory used by slic (see
70+
``peakmem_slic_basic``, below).
71+
References
72+
----------
73+
.. [1]: https://asv.readthedocs.io/en/stable/writing_benchmarks.html#peak-memory
74+
"""
75+
pass
76+
77+
def mem_chunk_graph(self):
78+
return self.chunk_graph
79+
80+
def track_traced_mem_size(self):
81+
return self.mem_size
82+
83+
def track_traced_mem_peak(self):
84+
return self.mem_peak

benchmarks/asv_bench/benchmarks/serialize.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
Timedelta64Field,
4141
TupleField,
4242
DictField,
43+
Complex64Field,
44+
Complex128Field,
4345
)
4446
from mars.services.subtask import Subtask, SubtaskResult, SubtaskStatus
4547
from mars.services.task import new_task_id
@@ -72,11 +74,13 @@ class MySerializable(Serializable):
7274
_int64_val = Int64Field("f3")
7375
_float32_val = Float32Field("f4")
7476
_float64_val = Float64Field("f5")
75-
_string_val = StringField("f6")
76-
_datetime64_val = Datetime64Field("f7")
77-
_timedelta64_val = Timedelta64Field("f8")
78-
_datatype_val = DataTypeField("f9")
79-
_slice_val = SliceField("f10")
77+
_complex64_val = Complex64Field("f6")
78+
_complex128_val = Complex128Field("f7")
79+
_string_val = StringField("f8")
80+
_datetime64_val = Datetime64Field("f9")
81+
_timedelta64_val = Timedelta64Field("f10")
82+
_datatype_val = DataTypeField("f11")
83+
_slice_val = SliceField("f12")
8084
_list_val = ListField("list_val", FieldTypes.int64)
8185
_tuple_val = TupleField("tuple_val", FieldTypes.string)
8286
_dict_val = DictField("dict_val", FieldTypes.string, FieldTypes.bytes)

mars/core/base.py

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,21 @@ def _copy_tags_(self):
5555
return getattr(cls, member)
5656
except AttributeError:
5757
slots = sorted(
58-
f.attr_name
59-
for k, f in self._FIELDS.items()
60-
if k not in self._no_copy_attrs_
58+
f.name for k, f in self._FIELDS.items() if k not in self._no_copy_attrs_
6159
)
6260
setattr(cls, member, slots)
6361
return slots
6462

6563
@property
6664
def _values_(self):
67-
return [self._FIELD_VALUES.get(k) for k in self._copy_tags_]
65+
values = []
66+
fields = self._FIELDS
67+
for k in self._copy_tags_:
68+
try:
69+
values.append(fields[k].get(self))
70+
except AttributeError:
71+
values.append(None)
72+
return values
6873

6974
def __mars_tokenize__(self):
7075
try:
@@ -91,19 +96,18 @@ def copy(self):
9196
return self.copy_to(type(self)(_key=self.key))
9297

9398
def copy_to(self, target: "Base"):
94-
new_values = dict()
95-
values = self._FIELD_VALUES
96-
for k in self._FIELDS:
97-
if k in self._no_copy_attrs_:
99+
target_fields = target._FIELDS
100+
no_copy_attrs = self._no_copy_attrs_
101+
for k, field in self._FIELDS.items():
102+
if k in no_copy_attrs:
103+
continue
104+
try:
105+
# Slightly faster than getattr.
106+
value = field.__get__(self, k)
107+
target_fields[k].set(target, value)
108+
except AttributeError:
98109
continue
99-
if k in values:
100-
new_values[k] = values[k]
101-
else:
102-
try:
103-
new_values[k] = getattr(self, k)
104-
except AttributeError:
105-
continue
106-
target._FIELD_VALUES.update(new_values)
110+
107111
return target
108112

109113
def copy_from(self, obj):
@@ -119,12 +123,14 @@ def id(self):
119123

120124
def to_kv(self, exclude_fields: Tuple[str], accept_value_types: Tuple[Type]):
121125
fields = self._FIELDS
122-
field_values = self._FIELD_VALUES
123-
return {
124-
fields[attr_name].tag: value
125-
for attr_name, value in field_values.items()
126-
if attr_name not in exclude_fields and isinstance(value, accept_value_types)
127-
}
126+
kv = {}
127+
no_value = object()
128+
for name, field in fields.items():
129+
if name not in exclude_fields:
130+
value = getattr(self, name, no_value)
131+
if value is not no_value and isinstance(value, accept_value_types):
132+
kv[field.tag] = value
133+
return kv
128134

129135

130136
def buffered_base(func):

mars/dataframe/arithmetic/tests/test_arithmetic.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,12 +1523,15 @@ def test_arithmetic_lazy_chunk_meta():
15231523
df2 = tile(df2)
15241524

15251525
chunk = df2.chunks[0].data
1526-
assert chunk._FIELD_VALUES.get("_dtypes") is None
1526+
assert chunk._FIELDS["_dtypes"].get(chunk) is None
15271527
pd.testing.assert_series_equal(chunk.dtypes, df.dtypes)
1528-
assert chunk._FIELD_VALUES.get("_index_value") is None
1528+
assert chunk._FIELDS["_dtypes"].get(chunk) is not None
1529+
assert chunk._FIELDS["_index_value"].get(chunk) is None
15291530
pd.testing.assert_index_equal(chunk.index_value.to_pandas(), pd.RangeIndex(3))
1530-
assert chunk._FIELD_VALUES.get("_columns_value") is None
1531+
assert chunk._FIELDS["_index_value"].get(chunk) is not None
1532+
assert chunk._FIELDS["_columns_value"].get(chunk) is None
15311533
pd.testing.assert_index_equal(chunk.columns_value.to_pandas(), pd.RangeIndex(3))
1534+
assert chunk._FIELDS["_columns_value"].get(chunk) is not None
15321535

15331536

15341537
def test_datetime_arithmetic():

mars/dataframe/core.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -505,15 +505,16 @@ def _gen_chunk_dtypes(instance: Chunk, index: int) -> Optional[pd.Series]:
505505
cache[tileable_key, index] = dtypes
506506
return dtypes
507507

508-
def __get__(self, instance, owner):
508+
def __get__(self, instance, owner=None):
509509
if not issubclass(owner, LazyMetaChunkData): # pragma: no cover
510510
return super().__get__(instance, owner)
511511

512-
values = instance._FIELD_VALUES
513-
dtypes = values.get("_dtypes", None)
514-
if dtypes is not None:
515-
# been set before
516-
return dtypes
512+
try:
513+
value = self.get(instance, owner)
514+
if value is not None:
515+
return value
516+
except AttributeError: # pragma: no cover
517+
pass
517518

518519
if instance.index is None:
519520
return super().__get__(instance, owner)
@@ -522,7 +523,7 @@ def __get__(self, instance, owner):
522523
index = instance.index[1]
523524
dtypes = self._gen_chunk_dtypes(instance, index)
524525
# cache dtypes
525-
values["_dtypes"] = dtypes
526+
self.set(instance, dtypes)
526527
return dtypes
527528

528529

@@ -557,15 +558,16 @@ def _gen_chunk_index_value(instance: Chunk, index: int) -> Optional[IndexValue]:
557558
cache[tileable_key, index] = index_value
558559
return index_value
559560

560-
def __get__(self, instance, owner):
561+
def __get__(self, instance, owner=None):
561562
if not issubclass(owner, LazyMetaChunkData): # pragma: no cover
562563
return super().__get__(instance, owner)
563564

564-
values = instance._FIELD_VALUES
565-
index_value = values.get("_index_value", None)
566-
if index_value is not None:
567-
# been set before
568-
return index_value
565+
try:
566+
value = self.get(instance, owner)
567+
if value is not None:
568+
return value
569+
except AttributeError: # pragma: no cover
570+
pass
569571

570572
if instance.index is None:
571573
return super().__get__(instance, owner)
@@ -574,7 +576,7 @@ def __get__(self, instance, owner):
574576
index = instance.index[0]
575577
index_value = self._gen_chunk_index_value(instance, index)
576578
# cache index_value
577-
values["_index_value"] = index_value
579+
self.set(instance, index_value)
578580
return index_value
579581

580582

@@ -605,15 +607,16 @@ def _gen_chunk_columns_value(instance: Chunk, index: int) -> Optional[IndexValue
605607
cache[tileable_key, index] = columns_value
606608
return columns_value
607609

608-
def __get__(self, instance, owner):
610+
def __get__(self, instance, owner=None):
609611
if not issubclass(owner, LazyMetaChunkData): # pragma: no cover
610612
return super().__get__(instance, owner)
611613

612-
values = instance._FIELD_VALUES
613-
columns_value = values.get("_columns_value", None)
614-
if columns_value is not None:
615-
# been set before
616-
return columns_value
614+
try:
615+
value = self.get(instance, owner)
616+
if value is not None:
617+
return value
618+
except AttributeError: # pragma: no cover
619+
pass
617620

618621
if instance.index is None:
619622
return super().__get__(instance, owner)
@@ -622,7 +625,7 @@ def __get__(self, instance, owner):
622625
index = instance.index[1]
623626
columns_value = self._gen_chunk_columns_value(instance, index)
624627
# cache columns_value
625-
values["_columns_value"] = columns_value
628+
self.set(instance, columns_value)
626629
return columns_value
627630

628631

mars/dataframe/indexing/tests/test_indexing.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -940,16 +940,20 @@ def test_getitem_lazy_chunk_meta():
940940
df2 = tile(df2)
941941

942942
chunk = df2.chunks[0].data
943-
assert chunk._FIELD_VALUES.get("_dtypes") is None
943+
assert chunk._FIELDS["_dtypes"].get(chunk) is None
944944
pd.testing.assert_series_equal(chunk.dtypes, df.dtypes[[0, 2]])
945-
assert chunk._FIELD_VALUES.get("_index_value") is None
945+
assert chunk._FIELDS["_dtypes"].get(chunk) is not None
946+
assert chunk._FIELDS["_index_value"].get(chunk) is None
946947
pd.testing.assert_index_equal(chunk.index_value.to_pandas(), pd.RangeIndex(3))
947-
assert chunk._FIELD_VALUES.get("_columns_value") is None
948+
assert chunk._FIELDS["_index_value"].get(chunk) is not None
949+
assert chunk._FIELDS["_columns_value"].get(chunk) is None
948950
pd.testing.assert_index_equal(chunk.columns_value.to_pandas(), pd.Index([0, 2]))
951+
assert chunk._FIELDS["_columns_value"].get(chunk) is not None
949952

950953
df2 = df[2]
951954
df2 = tile(df2)
952955

953956
chunk = df2.chunks[0].data
954-
assert chunk._FIELD_VALUES.get("_index_value") is None
957+
assert chunk._FIELDS["_index_value"].get(chunk) is None
955958
pd.testing.assert_index_equal(chunk.index_value.to_pandas(), pd.RangeIndex(3))
959+
assert chunk._FIELDS["_index_value"].get(chunk) is not None

0 commit comments

Comments
 (0)