1
1
# -*- coding: utf-8 -*-
2
2
from enum import Enum
3
3
from typing import Dict , List , Union
4
-
4
+ from loguru import logger
5
5
from email_validator import (
6
6
EmailNotValidError ,
7
7
EmailUndeliverableError ,
@@ -28,8 +28,8 @@ def validate_email_address(
28
28
timeout : int = 10 ,
29
29
dns_type : str = 'timeout' ,
30
30
) -> Dict [str , Union [str , bool , Dict [str , Union [str , bool , List [str ]]]]]:
31
- # Log the email being validated
32
- # logger.info (f"Validating email: {email }")
31
+
32
+ logger .debug (f"validate_email_address: { email } with params : { locals () } " )
33
33
valid : bool = False
34
34
35
35
dns_type = dns_type .lower ()
@@ -43,7 +43,7 @@ def validate_email_address(
43
43
dns_param = {"timeout" : timeout }
44
44
45
45
try :
46
- # Validate the email address, checking for deliverability
46
+
47
47
emailinfo = validate_email (
48
48
email ,
49
49
check_deliverability = check_deliverability ,
@@ -54,10 +54,9 @@ def validate_email_address(
54
54
globally_deliverable = globally_deliverable ,
55
55
** dns_param ,
56
56
)
57
- # Normalize the email address
57
+
58
58
email : str = emailinfo .normalized
59
59
60
- # Initialize an empty dictionary to store email information
61
60
email_dict : Dict [
62
61
str , Union [str , bool , Dict [str , Union [str , bool , List [str ]]]]
63
62
] = {
@@ -66,63 +65,26 @@ def validate_email_address(
66
65
"email_data" : None ,
67
66
}
68
67
email_data : Dict [str , Union [str , bool , List [str ]]] = {}
69
- # Populate the dictionary with attributes from the validated email,
70
- # if they exist
71
- # if hasattr(emailinfo, "original"):
72
- # email_data["original"] = emailinfo.original
73
- # if hasattr(emailinfo, "normalized"):
74
- # email_data["normalized"] = emailinfo.normalized
75
- # if hasattr(emailinfo, "local_part"):
76
- # email_data["local"] = emailinfo.local_part
77
- # if hasattr(emailinfo, "domain"):
78
- # email_data["domain"] = emailinfo.domain
79
- # if hasattr(emailinfo, "ascii_email"):
80
- # email_data["ascii_email"] = emailinfo.ascii_email
81
- # if hasattr(emailinfo, "ascii_local_part"):
82
- # email_data["ascii_local"] = emailinfo.ascii_local_part
83
- # if hasattr(emailinfo, "ascii_domain"):
84
- # email_data["ascii_domain"] = emailinfo.ascii_domain
85
- # if hasattr(emailinfo, "smtputf8"):
86
- # email_data["smtputf8"] = emailinfo.smtputf8
87
- # if hasattr(emailinfo, "domain_address"):
88
- # email_data["domain_address"] = emailinfo.domain_address
89
- # if hasattr(emailinfo, "display_name"):
90
- # email_data["display_name"] = emailinfo.display_name
68
+
91
69
if hasattr (emailinfo , "mx" ):
92
70
email_data ["mx" ] = emailinfo .mx
93
71
if emailinfo .mx is not None :
94
72
email_dict ["valid" ] = True
95
- # logger.info(f"Email is valid: {email}")
96
- # if hasattr(emailinfo, "mx_fallback_type"):
97
- # email_data["mx_fallback_type"] = emailinfo.mx_fallback_type
98
- # if hasattr(emailinfo, "spf"):
99
- # email_data["spf"] = emailinfo.spf
100
-
101
- # Log that the email is valid
102
- # logger.info(f"Email is valid: {email}")
103
- # Return a dictionary indicating that the email is valid, along with
104
- # the normalized email and the email information dictionary
73
+ logger .info (f"Email is valid: { email } " )
74
+ else :
75
+ if emailinfo .normalized is not None :
76
+ email_dict ["valid" ] = True
77
+ logger .info (F"Email no MX record found: { email } " )
78
+
105
79
email_dict ["email_data" ] = dict (sorted (vars (emailinfo ).items ()))
106
- # email_dict['edata'] = vars(emailinfo)
80
+
107
81
return email_dict
108
82
109
83
except EmailUndeliverableError as e :
110
- # Log the error if email deliverability fails
111
- # logger.error(f"Email deliverability failed for {email}: {str(e)}")
112
- # Return a dictionary indicating that the email is not deliverable,
113
- # along with the original email and the error message
114
84
return {"valid" : valid , "email" : email , "error" : str (e )}
115
85
except EmailNotValidError as e :
116
- # Log the error if email validation fails
117
- # logger.error(f"Email validation failed for {email}: {str(e)}")
118
- # Return a dictionary indicating that the email is not valid, along
119
- # with the original email and the error message
120
86
return {"valid" : valid , "email" : email , "error" : str (e )}
121
87
except Exception as e :
122
- # Log the error if an unexpected exception occurs
123
- # logger.error(f"An unexpected error occurred: {str(e)}")
124
- # Return a dictionary indicating that an unexpected error occurred,
125
- # along with the original email and the error message
126
88
return {"valid" : valid , "email" : email , "error" : str (e )}
127
89
128
90
@@ -155,14 +117,60 @@ def validate_email_address(
155
117
'this\\ still\\ "not\\ \\ allowed@example.com' , # even if escaped (preceded by a backslash), spaces, quotes, and backslashes must still be contained by quotes
156
118
"1234567890123456789012345678901234567890123456789012345678901234+x@example.com" , # local part is longer than 64 characters
157
119
"mike@google.com" ,
120
+ # Emails with empty local part
121
+ "@example.com" , # only valid if allow_empty_local is True
122
+
123
+ # Emails with non-ASCII characters
124
+ "üñîçøðé@example.com" , # only valid if allow_smtputf8 is True
125
+ "user@üñîçøðé.com" , # only valid if allow_smtputf8 is True
126
+
127
+ # Emails with quoted local part
128
+ '"john.doe"@example.com' , # only valid if allow_quoted_local is True
129
+ '"john..doe"@example.com' , # only valid if allow_quoted_local is True
130
+
131
+ # Emails with display name
132
+ 'John Doe <john@example.com>' , # only valid if allow_display_name is True
133
+
134
+ # Emails with domain literal
135
+ 'user@[192.0.2.1]' , # only valid if allow_domain_literal is True
136
+
137
+ # Emails with long local part
138
+ "a" * 65 + "@example.com" , # local part is longer than 64 characters
139
+
140
+ # Emails with invalid characters
141
+ "john doe@example.com" , # space is not allowed
142
+ "john@doe@example.com" , # only one @ is allowed
143
+ "john.doe@.com" , # domain can't start with a dot
144
+ "john.doe@example..com" , # domain can't have two consecutive dots
145
+ ]
146
+ # create a list of configurations
147
+ configurations = [
148
+ {"check_deliverability" : True , "test_environment" : False , "allow_smtputf8" : False , "allow_empty_local" : False , "allow_quoted_local" : False , "allow_display_name" : False , "allow_domain_literal" : False , "globally_deliverable" : None , "timeout" : 10 , "dns_type" : 'timeout' },
149
+ {"check_deliverability" : False , "test_environment" : True , "allow_smtputf8" : True , "allow_empty_local" : True , "allow_quoted_local" : True , "allow_display_name" : True , "allow_domain_literal" : True , "globally_deliverable" : None , "timeout" : 5 , "dns_type" : 'dns' },
150
+ # add more configurations here
158
151
]
159
152
160
153
import pprint
161
154
import time
162
155
163
- for email in email_addresses :
164
- t0 = time .time ()
165
- res = validate_email_address (email )
166
- if res ['valid' ]:
167
- pprint .pprint (res , indent = 4 )
168
- # print(f"Time taken: {time.time() - t0:.2f}")
156
+ t0 = time .time ()
157
+ validity = []
158
+ for _ in range (20 ):
159
+ for email in email_addresses :
160
+ for config in configurations :
161
+
162
+ res = validate_email_address (email , ** config )
163
+ # if res['valid']:
164
+ # pprint.pprint(res, indent=4)
165
+ # pprint.pprint(res, indent=4)
166
+ # print(f"Time taken: {time.time() - t0:.2f}")
167
+ # print(f"Email: {email} is valid: {res['valid']}")
168
+ # validity.append(f"Email: {email} is valid: {res['valid']}")
169
+ validity .append (res )
170
+ t1 = time .time ()
171
+ validity = sorted (validity , key = lambda x : x ['email' ])
172
+
173
+ for v in validity :
174
+ pprint .pprint (v , indent = 4 )
175
+
176
+ print (f"Time taken: { t1 - t0 :.2f} " )
0 commit comments