1
- from typing import Any
1
+ from deepdiff . helper import JSON , SummaryNodeType
2
2
from deepdiff .serialization import json_dumps
3
3
4
4
5
- def _truncate (s , max_len ) :
5
+ def _truncate (s : str , max_len : int ) -> str :
6
6
"""
7
7
Truncate string s to max_len characters.
8
8
If possible, keep the first (max_len-5) characters, then '...' then the last 2 characters.
@@ -12,165 +12,126 @@ def _truncate(s, max_len):
12
12
if max_len <= 5 :
13
13
return s [:max_len ]
14
14
return s [:max_len - 5 ] + "..." + s [- 2 :]
15
+ # Re-defining the functions due to environment reset
15
16
16
- class JSONNode :
17
- def __init__ (self , data : Any , key = None ):
18
- """
19
- Build a tree node for the JSON data.
20
- If this node is a child of a dict, key is its key name.
21
- """
22
- self .key = key
23
- self .children_list : list [JSONNode ] = []
24
- self .children_dict : list [tuple [Any , JSONNode ]] = []
25
- self .value : str = ""
26
- if isinstance (data , dict ):
27
- self .type = "dict"
28
- # Preserve insertion order: list of (key, child) pairs.
29
- for k , v in data .items ():
30
- child = JSONNode (v , key = k )
31
- self .children_dict .append ((k , child ))
32
- elif isinstance (data , list ):
33
- self .type = "list"
34
- self .children_list = [JSONNode (item ) for item in data ]
17
+
18
+ # Function to calculate node weights recursively
19
+ def calculate_weights (node ):
20
+ if isinstance (node , dict ):
21
+ weight = 0
22
+ children_weights = {}
23
+ for k , v in node .items ():
24
+ edge_weight = len (k )
25
+ child_weight , child_structure = calculate_weights (v )
26
+ total_weight = edge_weight + child_weight
27
+ weight += total_weight
28
+ children_weights [k ] = (edge_weight , child_weight , child_structure )
29
+ return weight , (SummaryNodeType .dict , children_weights )
30
+
31
+ elif isinstance (node , list ):
32
+ weight = 0
33
+ children_weights = []
34
+ for v in node :
35
+ edge_weight = 0 # Index weights are zero
36
+ child_weight , child_structure = calculate_weights (v )
37
+ total_weight = edge_weight + child_weight
38
+ weight += total_weight
39
+ children_weights .append ((edge_weight , child_weight , child_structure ))
40
+ return weight , (SummaryNodeType .list , children_weights )
41
+
42
+ else :
43
+ if isinstance (node , str ):
44
+ node_weight = len (node )
45
+ elif isinstance (node , int ):
46
+ node_weight = len (str (node ))
47
+ elif isinstance (node , float ):
48
+ node_weight = len (str (round (node , 2 )))
49
+ elif node is None :
50
+ node_weight = 1
51
+ else :
52
+ node_weight = 0
53
+ return node_weight , (SummaryNodeType .leaf , node )
54
+
55
+ # Include previously defined functions for shrinking with threshold
56
+ # (Implementing directly the balanced summarization algorithm as above)
57
+
58
+ # Balanced algorithm (simplified version):
59
+ def shrink_tree_balanced (node_structure , max_weight : int , balance_threshold : float ) -> tuple [JSON , float ]:
60
+ node_type , node_info = node_structure
61
+
62
+ if node_type is SummaryNodeType .leaf :
63
+ leaf_value = node_info
64
+ leaf_weight , _ = calculate_weights (leaf_value )
65
+ if leaf_weight <= max_weight :
66
+ return leaf_value , leaf_weight
35
67
else :
36
- self .type = "primitive"
37
- # For primitives, use json.dumps to get a compact representation.
38
- try :
39
- self .value = json_dumps (data )
40
- except Exception :
41
- self .value = str (data )
42
-
43
- def __repr__ (self ) -> str :
44
- if self .children_list :
45
- return "List-[" + "," .join ([str (i ) for i in self .children_list ]) + "]"
46
- if self .children_dict :
47
- return "Dict-[" + "," .join ([f"{ i } :{ v } " for i , v in self .children_dict ]) + "]"
48
- return self .value
49
-
50
- __str__ = __repr__
51
-
52
- def full_repr (self ) -> str :
53
- """Return the full minimized JSON representation (without trimming) for this node."""
54
- if self .type == "primitive" :
55
- return self .value
56
- elif self .type == "dict" :
57
- parts = []
58
- for k , child in self .children_dict :
59
- parts .append (f'"{ k } ":{ child .full_repr ()} ' )
60
- return "{" + "," .join (parts ) + "}"
61
- elif self .type == "list" :
62
- parts = [child .full_repr () for child in self .children_list ]
63
- return "[" + "," .join (parts ) + "]"
64
- return self .value
65
-
66
- def full_weight (self ):
67
- """Return the character count of the full representation."""
68
- return len (self .full_repr ())
69
-
70
- def _summarize (self , budget ) -> str :
71
- """
72
- Return a summary string for this node that fits within budget characters.
73
- The algorithm may drop whole sub-branches (for dicts) or truncate long primitives.
74
- """
75
- if self .type == "primitive" :
76
- rep = self .value
77
- if len (rep ) <= budget :
78
- return rep
79
- else :
80
- return _truncate (rep , budget )
81
- elif self .type == "dict" :
82
- return self ._summarize_dict (budget )
83
- elif self .type == "list" :
84
- return self ._summarize_list (budget )
85
- return str (self .value )
86
-
87
- def _summarize_dict (self , budget ) -> str :
88
- # If the dict is empty, return {}
89
- if not self .children_dict :
90
- return "{}"
91
- # Build a list of pairs with fixed parts:
92
- # Each pair: key_repr is f'"{key}":'
93
- # Also store the full (untrimmed) child representation.
94
- pairs = []
95
- for k , child in self .children_dict :
96
- key_repr = f'"{ k } ":'
97
- child_full = child .full_repr ()
98
- pair_full = key_repr + child_full
99
- pairs .append ({
100
- "key" : k ,
101
- "child" : child ,
102
- "key_repr" : key_repr ,
103
- "child_full" : child_full ,
104
- "pair_full" : pair_full ,
105
- "full_length" : len (pair_full )
106
- })
107
- n = len (pairs )
108
- fixed_overhead = 2 + (n - 1 ) # braces plus commas between pairs
109
- total_full = sum (p ["full_length" ] for p in pairs ) + fixed_overhead
110
- # If full representation fits, return it.
111
- if total_full <= budget :
112
- parts = [p ["key_repr" ] + p ["child_full" ] for p in pairs ]
113
- return "{" + "," .join (parts ) + "}"
114
-
115
- # Otherwise, try dropping some pairs.
116
- kept = pairs .copy ()
117
- # Heuristic: while the representation is too long, drop the pair whose child_full is longest.
118
- while kept :
119
- # Sort kept pairs in original insertion order.
120
- kept_sorted = sorted (kept , key = lambda p : self .children_dict .index ((p ["key" ], p ["child" ])))
121
- current_n = len (kept_sorted )
122
- fixed = sum (len (p ["key_repr" ]) for p in kept_sorted ) + (current_n - 1 ) + 2
123
- remaining_budget = budget - fixed
124
- if remaining_budget < 0 :
125
- # Not enough even for fixed costs; drop one pair.
126
- kept .remove (max (kept , key = lambda p : len (p ["child_full" ])))
68
+ if isinstance (leaf_value , str ):
69
+ truncated_value = _truncate (leaf_value , max_weight )
70
+ return truncated_value , len (truncated_value )
71
+ elif isinstance (leaf_value , (int , float )):
72
+ leaf_str = str (leaf_value )
73
+ truncated_str = leaf_str [:max_weight ]
74
+ try :
75
+ return int (truncated_str ), len (truncated_str )
76
+ except Exception :
77
+ try :
78
+ return float (truncated_str ), len (truncated_str )
79
+ except Exception :
80
+ return truncated_str , len (truncated_str )
81
+ elif leaf_value is None :
82
+ return None , 1 if max_weight >= 1 else 0
83
+
84
+ elif node_type is SummaryNodeType .dict :
85
+ shrunk_dict = {}
86
+ total_weight = 0
87
+ sorted_children = sorted (node_info .items (), key = lambda x : x [1 ][0 ] + x [1 ][1 ], reverse = True )
88
+
89
+ for k , (edge_w , _ , child_struct ) in sorted_children :
90
+ allowed_branch_weight = min (max_weight * balance_threshold , max_weight - total_weight )
91
+ if allowed_branch_weight <= edge_w :
127
92
continue
128
- total_child_full = sum (len (p ["child_full" ]) for p in kept_sorted )
129
- # Allocate available budget for each child's summary proportionally.
130
- child_summaries = []
131
- for p in kept_sorted :
132
- ideal = int (remaining_budget * (len (p ["child_full" ]) / total_child_full )) if total_child_full > 0 else 0
133
- summary_child = p ["child" ]._summarize (ideal )
134
- child_summaries .append (summary_child )
135
- candidate = "{" + "," .join ([p ["key_repr" ] + s for p , s in zip (kept_sorted , child_summaries )]) + "}"
136
- if len (candidate ) <= budget :
137
- return candidate
138
- # If still too long, drop the pair with the largest child_full length.
139
- to_drop = max (kept , key = lambda p : len (p ["child_full" ]))
140
- kept .remove (to_drop )
141
- # If nothing remains, return a truncated empty object.
142
- return _truncate ("{}" , budget )
143
-
144
- def _summarize_list (self , budget ) -> str :
145
- # If the list is empty, return []
146
- if not self .children_list :
147
- return "[]"
148
- full_repr = self .full_repr ()
149
- if len (full_repr ) <= budget :
150
- return full_repr
151
- # For lists, show only the first element and an omission indicator if more elements exist.
152
- suffix = ",..." if len (self .children_list ) > 1 else ""
153
-
154
- inner_budget = budget - 2 - len (suffix ) # subtract brackets and suffix
155
- budget_per_element : int = min (inner_budget , max (4 , inner_budget // len (self .children_list )))
156
- max_element_count : int = inner_budget // budget_per_element
157
- element_summaries : list [str ] = []
158
- for element in self .children_list [:max_element_count ]:
159
- element_summaries .append (element ._summarize (budget_per_element ))
160
- # first_summary = self.children_list[0]._summarize(budget_per_element)
161
- joined_elements = "," .join (element_summaries )
162
- joined_elements = joined_elements .rstrip ("." )
163
- joined_elements = joined_elements [:inner_budget ]
164
- return f"[{ joined_elements } { suffix } ]"
165
- # if len(candidate) <= budget:
166
- # return candidate
167
- # return _truncate(candidate, budget)
168
-
169
-
170
- def summarize (data , max_length = 200 ):
171
- """
172
- Build a tree for the given JSON-compatible data and return its summary,
173
- ensuring the final string is no longer than self.max_length.
174
- """
175
- root = JSONNode (data )
176
- return root ._summarize (max_length ).replace ("{," , "{" )
93
+
94
+ remaining_weight = int (allowed_branch_weight - edge_w )
95
+ shrunk_child , shrunk_weight = shrink_tree_balanced (child_struct , remaining_weight , balance_threshold )
96
+ if shrunk_child is not None :
97
+ shrunk_dict [k [:edge_w ]] = shrunk_child
98
+ total_weight += edge_w + shrunk_weight
99
+
100
+ if total_weight >= max_weight :
101
+ break
102
+ if not shrunk_dict :
103
+ return None , 0
104
+
105
+ return shrunk_dict , total_weight
106
+
107
+ elif node_type is SummaryNodeType .list :
108
+ shrunk_list = []
109
+ total_weight = 0
110
+ sorted_children = sorted (node_info , key = lambda x : x [0 ] + x [1 ], reverse = True )
111
+ for edge_w , _ , child_struct in sorted_children :
112
+ allowed_branch_weight = int (min (max_weight * balance_threshold , max_weight - total_weight ))
113
+ shrunk_child , shrunk_weight = shrink_tree_balanced (child_struct , allowed_branch_weight , balance_threshold )
114
+ if shrunk_child is not None :
115
+ shrunk_list .append (shrunk_child )
116
+ total_weight += shrunk_weight
117
+ if total_weight >= max_weight - 1 :
118
+ shrunk_list .append ("..." )
119
+ break
120
+ if not shrunk_list :
121
+ return None , 0
122
+ return shrunk_list , total_weight
123
+ return None , 0
124
+
125
+
126
+ def greedy_tree_summarization_balanced (json_data : JSON , max_weight : int , balance_threshold = 0.6 ) -> JSON :
127
+ total_weight , tree_structure = calculate_weights (json_data )
128
+ if total_weight <= max_weight :
129
+ return json_data
130
+ shrunk_tree , _ = shrink_tree_balanced (tree_structure , max_weight , balance_threshold )
131
+ return shrunk_tree
132
+
133
+
134
+ def summarize (data : JSON , max_length :int = 200 , balance_threshold :float = 0.6 ) -> str :
135
+ return json_dumps (
136
+ greedy_tree_summarization_balanced (data , max_length , balance_threshold )
137
+ )
0 commit comments