1
+ """Validation module for helm-values-manager."""
2
+ import json
3
+ import os
4
+ from pathlib import Path
5
+ from typing import Dict , List , Any , Optional , Set
6
+
7
+ from rich .console import Console
8
+ from rich .text import Text
9
+
10
+ from helm_values_manager .models import Schema , SchemaValue
11
+
12
+ console = Console ()
13
+
14
+
15
+ class ErrorMessage (Text ):
16
+ """Error message styled text."""
17
+ def __init__ (self , text : str ):
18
+ super ().__init__ (text , style = "bold red" )
19
+
20
+
21
+ class WarningMessage (Text ):
22
+ """Warning message styled text."""
23
+ def __init__ (self , text : str ):
24
+ super ().__init__ (text , style = "bold yellow" )
25
+
26
+
27
+ class ValidationError :
28
+ """Represents a single validation error."""
29
+
30
+ def __init__ (self , context : str , message : str , env : Optional [str ] = None ):
31
+ self .context = context
32
+ self .message = message
33
+ self .env = env
34
+
35
+ def __str__ (self ):
36
+ if self .env :
37
+ return f"[{ self .env } ] { self .context } : { self .message } "
38
+ return f"{ self .context } : { self .message } "
39
+
40
+
41
+ class Validator :
42
+ """Validates schema and values files."""
43
+
44
+ def __init__ (self , schema_path : Path , values_base_path : Optional [Path ] = None ):
45
+ self .schema_path = schema_path
46
+ self .values_base_path = values_base_path or Path ("." )
47
+ self .errors : List [ValidationError ] = []
48
+
49
+ def validate_all (self , env : Optional [str ] = None ) -> bool :
50
+ """Validate schema and optionally values for specific environment."""
51
+ self .errors = []
52
+
53
+ # Validate schema
54
+ self ._validate_schema ()
55
+
56
+ # Validate values
57
+ if env :
58
+ self ._validate_values_for_env (env )
59
+ else :
60
+ # Validate all environments
61
+ self ._validate_all_values ()
62
+
63
+ return len (self .errors ) == 0
64
+
65
+ def _validate_schema (self ):
66
+ """Validate schema structure and integrity."""
67
+ try :
68
+ if not self .schema_path .exists ():
69
+ self .errors .append (ValidationError ("Schema" , f"File not found: { self .schema_path } " ))
70
+ return
71
+
72
+ with open (self .schema_path ) as f :
73
+ data = json .load (f )
74
+
75
+ schema = Schema (** data )
76
+ except json .JSONDecodeError as e :
77
+ self .errors .append (ValidationError ("Schema" , f"Invalid JSON: { e } " ))
78
+ return
79
+ except Exception as e :
80
+ self .errors .append (ValidationError ("Schema" , f"Invalid schema: { e } " ))
81
+ return
82
+
83
+ # Check schema version
84
+ if schema .version != "1.0" :
85
+ self .errors .append (ValidationError ("Schema" , f"Unsupported version: { schema .version } " ))
86
+
87
+ # Validate each entry
88
+ seen_keys : Set [str ] = set ()
89
+ seen_paths : Set [str ] = set ()
90
+
91
+ for entry in schema .values :
92
+ # Check for duplicate keys
93
+ if entry .key in seen_keys :
94
+ self .errors .append (ValidationError ("Schema" , f"Duplicate key: { entry .key } " ))
95
+ seen_keys .add (entry .key )
96
+
97
+ # Check for duplicate paths
98
+ if entry .path in seen_paths :
99
+ self .errors .append (ValidationError ("Schema" , f"Duplicate path: { entry .path } " ))
100
+ seen_paths .add (entry .path )
101
+
102
+ # Validate path format (alphanumeric + dots)
103
+ if not all (c .isalnum () or c in '.-_' for c in entry .path ):
104
+ self .errors .append (ValidationError ("Schema" , f"Invalid path format: { entry .path } " ))
105
+
106
+ # Validate type
107
+ valid_types = ["string" , "number" , "boolean" , "array" , "object" ]
108
+ if entry .type not in valid_types :
109
+ self .errors .append (ValidationError ("Schema" , f"Invalid type for { entry .key } : { entry .type } " ))
110
+
111
+ # Validate default value type if present
112
+ if entry .default is not None :
113
+ if not self ._validate_value_type (entry .default , entry .type ):
114
+ self .errors .append (
115
+ ValidationError ("Schema" , f"Default value type mismatch for { entry .key } " )
116
+ )
117
+
118
+ def _validate_values_for_env (self , env : str ):
119
+ """Validate values for a specific environment."""
120
+ values_file = self .values_base_path / f"values-{ env } .json"
121
+
122
+ if not values_file .exists ():
123
+ # Not an error if no values file exists
124
+ return
125
+
126
+ try :
127
+ with open (values_file ) as f :
128
+ values = json .load (f )
129
+ except json .JSONDecodeError as e :
130
+ self .errors .append (ValidationError ("Values" , f"Invalid JSON in { values_file } : { e } " , env ))
131
+ return
132
+
133
+ env_values = values .get (env , {})
134
+
135
+ # Load schema
136
+ try :
137
+ if not self .schema_path .exists ():
138
+ return
139
+
140
+ with open (self .schema_path ) as f :
141
+ data = json .load (f )
142
+
143
+ schema = Schema (** data )
144
+ except Exception :
145
+ # Schema errors already reported
146
+ return
147
+
148
+ # Create lookup maps
149
+ schema_map = {entry .key : entry for entry in schema .values }
150
+
151
+ # Check each value in the file
152
+ for key , value in env_values .items ():
153
+ if key not in schema_map :
154
+ self .errors .append (ValidationError ("Values" , f"Unknown key: { key } " , env ))
155
+ continue
156
+
157
+ entry = schema_map [key ]
158
+
159
+ # Validate based on whether it's a secret
160
+ if entry .sensitive :
161
+ if not self ._validate_secret_structure (value ):
162
+ self .errors .append (
163
+ ValidationError ("Values" , f"Invalid secret structure for { key } " , env )
164
+ )
165
+ else :
166
+ # Validate environment variable exists
167
+ if isinstance (value , dict ) and value .get ("type" ) == "env" :
168
+ env_var = value .get ("name" , "" )
169
+ if env_var and not os .environ .get (env_var ):
170
+ # This is a warning, not an error
171
+ console .print (WarningMessage (
172
+ f"Environment variable not found: { env_var } (key: { key } , env: { env } )"
173
+ ))
174
+ else :
175
+ # Validate value type
176
+ if not self ._validate_value_type (value , entry .type ):
177
+ self .errors .append (
178
+ ValidationError ("Values" , f"Type mismatch for { key } : expected { entry .type } " , env )
179
+ )
180
+
181
+ # Check for missing required values
182
+ for entry in schema .values :
183
+ if entry .required and entry .key not in env_values and entry .default is None :
184
+ self .errors .append (
185
+ ValidationError ("Values" , f"Missing required value: { entry .key } " , env )
186
+ )
187
+
188
+ def _validate_all_values (self ):
189
+ """Validate values for all environments found."""
190
+ # Find all values files
191
+ pattern = "values-*.json"
192
+ for values_file in self .values_base_path .glob (pattern ):
193
+ # Extract environment from filename
194
+ env = values_file .stem .replace ("values-" , "" )
195
+ self ._validate_values_for_env (env )
196
+
197
+ def _validate_value_type (self , value : Any , expected_type : str ) -> bool :
198
+ """Check if value matches expected type."""
199
+ if expected_type == "string" :
200
+ return isinstance (value , str )
201
+ elif expected_type == "number" :
202
+ return isinstance (value , (int , float )) and not isinstance (value , bool )
203
+ elif expected_type == "boolean" :
204
+ return isinstance (value , bool )
205
+ elif expected_type == "array" :
206
+ return isinstance (value , list )
207
+ elif expected_type == "object" :
208
+ return isinstance (value , dict )
209
+ return False
210
+
211
+ def _validate_secret_structure (self , value : Any ) -> bool :
212
+ """Validate secret value structure."""
213
+ if not isinstance (value , dict ):
214
+ return False
215
+
216
+ if "type" not in value :
217
+ return False
218
+
219
+ # Currently only support 'env' type
220
+ if value ["type" ] == "env" :
221
+ return "name" in value and isinstance (value ["name" ], str )
222
+
223
+ # Unknown type
224
+ self .errors .append (
225
+ ValidationError ("Values" , f"Unsupported secret type: { value ['type' ]} " )
226
+ )
227
+ return False
228
+
229
+ def print_errors (self ):
230
+ """Print all validation errors."""
231
+ if not self .errors :
232
+ return
233
+
234
+ console .print (ErrorMessage ("Validation failed:" ))
235
+ for error in self .errors :
236
+ console .print (f" - { error } " )
237
+
238
+
239
+ def validate_command (
240
+ schema_path : Path ,
241
+ values_base_path : Optional [Path ] = None ,
242
+ env : Optional [str ] = None
243
+ ) -> bool :
244
+ """Run validation and report results."""
245
+ validator = Validator (schema_path , values_base_path )
246
+
247
+ if validator .validate_all (env ):
248
+ if env :
249
+ console .print (f"✅ Validation passed for environment: { env } " )
250
+ else :
251
+ console .print ("✅ All validations passed" )
252
+ return True
253
+ else :
254
+ validator .print_errors ()
255
+ return False
0 commit comments