1- import uuid
2- from typing import Any , Dict , List , Optional , Tuple
1+ from typing import Any , List , Optional
32
43import structlog
54from presidio_analyzer import AnalyzerEngine
65from presidio_anonymizer import AnonymizerEngine
76
87from codegate .db .models import AlertSeverity
98from codegate .pipeline .base import PipelineContext
9+ from codegate .pipeline .sensitive_data .session_store import SessionStore
1010
1111logger = structlog .get_logger ("codegate.pii.analyzer" )
1212
1313
14- class PiiSessionStore :
15- """
16- A class to manage PII (Personally Identifiable Information) session storage.
17-
18- Attributes:
19- session_id (str): The unique identifier for the session. If not provided, a new UUID
20- is generated. mappings (Dict[str, str]): A dictionary to store mappings between UUID
21- placeholders and PII.
22-
23- Methods:
24- add_mapping(pii: str) -> str:
25- Adds a PII string to the session store and returns a UUID placeholder for it.
26-
27- get_pii(uuid_placeholder: str) -> str:
28- Retrieves the PII string associated with the given UUID placeholder. If the placeholder
29- is not found, returns the placeholder itself.
30- """
31-
32- def __init__ (self , session_id : str = None ):
33- self .session_id = session_id or str (uuid .uuid4 ())
34- self .mappings : Dict [str , str ] = {}
35-
36- def add_mapping (self , pii : str ) -> str :
37- uuid_placeholder = f"<{ str (uuid .uuid4 ())} >"
38- self .mappings [uuid_placeholder ] = pii
39- return uuid_placeholder
40-
41- def get_pii (self , uuid_placeholder : str ) -> str :
42- return self .mappings .get (uuid_placeholder , uuid_placeholder )
43-
44-
4514class PiiAnalyzer :
4615 """
4716 PiiAnalyzer class for analyzing and anonymizing text containing PII.
@@ -52,12 +21,12 @@ class PiiAnalyzer:
5221 Get or create the singleton instance of PiiAnalyzer.
5322 analyze:
5423 text (str): The text to analyze for PII.
55- Tuple[str, List[Dict[str, Any]], PiiSessionStore ]: The anonymized text, a list of
24+ Tuple[str, List[Dict[str, Any]], SessionStore ]: The anonymized text, a list of
5625 found PII details, and the session store.
5726 entities (List[str]): The PII entities to analyze for.
5827 restore_pii:
5928 anonymized_text (str): The text with anonymized PII.
60- session_store (PiiSessionStore ): The PiiSessionStore used for anonymization.
29+ session_store (SessionStore ): The SessionStore used for anonymization.
6130 str: The text with original PII restored.
6231 """
6332
@@ -95,13 +64,11 @@ def __init__(self):
9564 # Create analyzer with custom NLP engine
9665 self .analyzer = AnalyzerEngine (nlp_engine = nlp_engine )
9766 self .anonymizer = AnonymizerEngine ()
98- self .session_store = PiiSessionStore ()
67+ self .session_store = SessionStore ()
9968
10069 PiiAnalyzer ._instance = self
10170
102- def analyze (
103- self , text : str , context : Optional [PipelineContext ] = None
104- ) -> Tuple [str , List [Dict [str , Any ]], PiiSessionStore ]:
71+ def analyze (self , text : str , context : Optional [PipelineContext ] = None ) -> List :
10572 # Prioritize credit card detection first
10673 entities = [
10774 "PHONE_NUMBER" ,
@@ -125,81 +92,30 @@ def analyze(
12592 language = "en" ,
12693 score_threshold = 0.3 , # Lower threshold to catch more potential matches
12794 )
95+ return analyzer_results
12896
129- # Track found PII
130- found_pii = []
131-
132- # Only anonymize if PII was found
133- if analyzer_results :
134- # Log each found PII instance and anonymize
135- anonymized_text = text
136- for result in analyzer_results :
137- pii_value = text [result .start : result .end ]
138- uuid_placeholder = self .session_store .add_mapping (pii_value )
139- pii_info = {
140- "type" : result .entity_type ,
141- "value" : pii_value ,
142- "score" : result .score ,
143- "start" : result .start ,
144- "end" : result .end ,
145- "uuid_placeholder" : uuid_placeholder ,
146- }
147- found_pii .append (pii_info )
148- anonymized_text = anonymized_text .replace (pii_value , uuid_placeholder )
149-
150- # Log each PII detection with its UUID mapping
151- logger .info (
152- "PII detected and mapped" ,
153- pii_type = result .entity_type ,
154- score = f"{ result .score :.2f} " ,
155- uuid = uuid_placeholder ,
156- # Don't log the actual PII value for security
157- value_length = len (pii_value ),
158- session_id = self .session_store .session_id ,
159- )
160-
161- # Log summary of all PII found in this analysis
162- if found_pii and context :
163- # Create notification string for alert
164- notify_string = (
165- f"**PII Detected** 🔒\n "
166- f"- Total PII Found: { len (found_pii )} \n "
167- f"- Types Found: { ', ' .join (set (p ['type' ] for p in found_pii ))} \n "
168- )
169- context .add_alert (
170- self ._name ,
171- trigger_string = notify_string ,
172- severity_category = AlertSeverity .CRITICAL ,
173- )
174-
175- logger .info (
176- "PII analysis complete" ,
177- total_pii_found = len (found_pii ),
178- pii_types = [p ["type" ] for p in found_pii ],
179- session_id = self .session_store .session_id ,
180- )
181-
182- # Return the anonymized text, PII details, and session store
183- return anonymized_text , found_pii , self .session_store
184-
185- # If no PII found, return original text, empty list, and session store
186- return text , [], self .session_store
187-
188- def restore_pii (self , anonymized_text : str , session_store : PiiSessionStore ) -> str :
97+ def restore_pii (self , session_id : str , anonymized_text : str ) -> str :
18998 """
19099 Restore the original PII (Personally Identifiable Information) in the given anonymized text.
191100
192101 This method replaces placeholders in the anonymized text with their corresponding original
193- PII values using the mappings stored in the provided PiiSessionStore .
102+ PII values using the mappings stored in the provided SessionStore .
194103
195104 Args:
196105 anonymized_text (str): The text containing placeholders for PII.
197- session_store (PiiSessionStore ): The session store containing mappings of placeholders
106+ session_id (str ): The session id containing mappings of placeholders
198107 to original PII.
199108
200109 Returns:
201110 str: The text with the original PII restored.
202111 """
203- for uuid_placeholder , original_pii in session_store .mappings .items ():
112+ session_data = self .session_store .get_by_session_id (session_id )
113+ if not session_data :
114+ logger .warning (
115+ "No active PII session found for given session ID. Unable to restore PII."
116+ )
117+ return anonymized_text
118+
119+ for uuid_placeholder , original_pii in session_data .items ():
204120 anonymized_text = anonymized_text .replace (uuid_placeholder , original_pii )
205121 return anonymized_text
0 commit comments