7
7
import java .security .cert .Certificate ;
8
8
import java .security .cert .CertificateParsingException ;
9
9
import java .security .cert .X509Certificate ;
10
+ import java .util .Collection ;
11
+ import java .util .List ;
12
+
13
+ import org .slf4j .Logger ;
14
+ import org .slf4j .LoggerFactory ;
10
15
11
16
import static org .apache .commons .lang3 .StringUtils .startsWith ;
12
17
13
18
public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder {
19
+ private static final Logger LOG = LoggerFactory .getLogger (SpiffePrincipalBuilder .class );
20
+
21
+ private static final String SPIFFE_TYPE = "SPIFFE" ;
22
+
14
23
public KafkaPrincipal build (AuthenticationContext context ) {
15
24
if (context instanceof PlaintextAuthenticationContext ) {
16
25
return KafkaPrincipal .ANONYMOUS ;
@@ -30,14 +39,27 @@ public KafkaPrincipal build(AuthenticationContext context) {
30
39
return KafkaPrincipal .ANONYMOUS ;
31
40
}
32
41
X509Certificate cert = (X509Certificate ) peerCerts [0 ];
33
- return cert .getSubjectAlternativeNames ().stream ()
34
- .map (san -> (String ) san .get (1 ))
35
- .filter (uri -> startsWith (uri , "spiffe://" ))
36
- .findFirst ()
37
- .map (s -> new KafkaPrincipal (KafkaPrincipal .USER_TYPE , s ))
38
- .orElse (KafkaPrincipal .ANONYMOUS );
42
+
43
+ Collection <List <?>> sanCollection = cert .getSubjectAlternativeNames ();
44
+ KafkaPrincipal principal ;
45
+
46
+ if (sanCollection != null ) {
47
+ principal = sanCollection .stream ()
48
+ .map (san -> (String ) san .get (1 ))
49
+ .filter (uri -> startsWith (uri , "spiffe://" ))
50
+ .findFirst ()
51
+ .map (s -> new KafkaPrincipal (SPIFFE_TYPE , s ))
52
+ .orElse (new KafkaPrincipal (KafkaPrincipal .USER_TYPE , cert .getSubjectX500Principal ().getName ()));
53
+ } else {
54
+ principal = new KafkaPrincipal (KafkaPrincipal .USER_TYPE , cert .getSubjectX500Principal ().getName ());
55
+ }
56
+
57
+ LOG .debug ("PrincipalBuilder found principal: {}" , principal .toString ());
58
+
59
+ return principal ;
39
60
} catch (SSLPeerUnverifiedException | CertificateParsingException se ) {
61
+ LOG .warn ("Unhandled exception: " + se .toString ());
40
62
return KafkaPrincipal .ANONYMOUS ;
41
63
}
42
64
}
43
- }
65
+ }
0 commit comments