Skip to content

Commit 2ebfcf9

Browse files
author
Adam Gilat
committed
cleanup code/deps
1 parent 996763b commit 2ebfcf9

File tree

3 files changed

+111
-187
lines changed

3 files changed

+111
-187
lines changed

pom.xml

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,49 +20,34 @@
2020
</plugins>
2121
</build>
2222

23-
2423
<dependencies>
2524
<dependency>
2625
<groupId>org.apache.kafka</groupId>
2726
<artifactId>kafka_2.11</artifactId>
2827
<version>1.1.1</version>
2928
</dependency>
3029
<dependency>
31-
<groupId>org.apache.commons</groupId>
32-
<artifactId>commons-lang3</artifactId>
33-
<version>3.7</version>
34-
</dependency>
35-
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
36-
<dependency>
37-
<groupId>commons-io</groupId>
38-
<artifactId>commons-io</artifactId>
39-
<version>2.6</version>
30+
<groupId>com.google.code.findbugs</groupId>
31+
<artifactId>jsr305</artifactId>
32+
<version>3.0.2</version>
4033
</dependency>
41-
<!-- https://mvnrepository.com/artifact/junit/junit -->
4234
<dependency>
4335
<groupId>junit</groupId>
4436
<artifactId>junit</artifactId>
4537
<version>4.12</version>
4638
<scope>test</scope>
4739
</dependency>
48-
<!-- https://mvnrepository.com/artifact/org.easymock/easymock -->
4940
<dependency>
50-
<groupId>org.easymock</groupId>
51-
<artifactId>easymock</artifactId>
52-
<version>3.6</version>
41+
<groupId>org.mockito</groupId>
42+
<artifactId>mockito-core</artifactId>
43+
<version>2.20.1</version>
5344
<scope>test</scope>
5445
</dependency>
55-
<dependency>
56-
<groupId>org.slf4j</groupId>
57-
<artifactId>slf4j-api</artifactId>
58-
<version>1.7.5</version>
59-
60-
</dependency>
6146
<dependency>
6247
<groupId>org.slf4j</groupId>
6348
<artifactId>slf4j-simple</artifactId>
6449
<version>1.6.4</version>
65-
50+
<scope>test</scope>
6651
</dependency>
6752
</dependencies>
6853

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
package io.okro.kafka;
22

3-
import org.apache.kafka.common.security.auth.*;
3+
import org.apache.kafka.common.security.auth.AuthenticationContext;
4+
import org.apache.kafka.common.security.auth.KafkaPrincipal;
5+
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
6+
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
49

10+
import javax.annotation.Nullable;
511
import javax.net.ssl.SSLPeerUnverifiedException;
612
import javax.net.ssl.SSLSession;
713
import java.security.cert.Certificate;
@@ -10,56 +16,64 @@
1016
import java.util.Collection;
1117
import java.util.List;
1218

13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
15-
16-
import static org.apache.commons.lang3.StringUtils.startsWith;
17-
1819
public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder {
1920
private static final Logger LOG = LoggerFactory.getLogger(SpiffePrincipalBuilder.class);
2021

2122
private static final String SPIFFE_TYPE = "SPIFFE";
2223

2324
public KafkaPrincipal build(AuthenticationContext context) {
24-
if (context instanceof PlaintextAuthenticationContext) {
25+
if (!(context instanceof SslAuthenticationContext)) {
26+
LOG.trace("non-SSL connection coerced to ANONYMOUS");
2527
return KafkaPrincipal.ANONYMOUS;
2628
}
2729

28-
if (!(context instanceof SslAuthenticationContext)) {
29-
throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
30+
SSLSession session = ((SslAuthenticationContext) context).session();
31+
X509Certificate cert = firstX509(session);
32+
if (cert == null) {
33+
LOG.trace("first peer certificate missing / not x509");
34+
return KafkaPrincipal.ANONYMOUS;
3035
}
3136

32-
SSLSession sslSession = ((SslAuthenticationContext) context).session();
37+
String spiffeId = spiffeId(cert);
38+
if (spiffeId == null) {
39+
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
40+
}
41+
42+
return new KafkaPrincipal(SPIFFE_TYPE, spiffeId);
43+
}
44+
45+
private @Nullable X509Certificate firstX509(SSLSession session) {
3346
try {
34-
Certificate[] peerCerts = sslSession.getPeerCertificates();
35-
if (peerCerts == null || peerCerts.length == 0) {
36-
return KafkaPrincipal.ANONYMOUS;
47+
Certificate[] peerCerts = session.getPeerCertificates();
48+
if (peerCerts.length == 0) {
49+
return null;
3750
}
38-
if (!(peerCerts[0] instanceof X509Certificate)) {
39-
return KafkaPrincipal.ANONYMOUS;
51+
Certificate first = peerCerts[0];
52+
if (!(first instanceof X509Certificate)) {
53+
return null;
4054
}
41-
X509Certificate cert = (X509Certificate) peerCerts[0];
42-
43-
Collection<List<?>> sanCollection = cert.getSubjectAlternativeNames();
44-
KafkaPrincipal principal;
55+
return (X509Certificate) first;
56+
} catch (SSLPeerUnverifiedException e) {
57+
LOG.warn("failed to extract certificate", e);
58+
return null;
59+
}
60+
}
4561

46-
if (sanCollection != null) {
47-
principal = sanCollection.stream()
48-
.map(san -> (String) san.get(1))
49-
.filter(uri -> startsWith(uri, "spiffe://"))
50-
.findFirst()
51-
.map(s -> new KafkaPrincipal(SPIFFE_TYPE, s))
52-
.orElse(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName()));
53-
} else {
54-
principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
62+
private @Nullable String spiffeId(X509Certificate cert) {
63+
try {
64+
Collection<List<?>> sans = cert.getSubjectAlternativeNames();
65+
if (sans == null) {
66+
return null;
5567
}
5668

57-
LOG.debug("PrincipalBuilder found principal: {}", principal.toString());
58-
59-
return principal;
60-
} catch (SSLPeerUnverifiedException | CertificateParsingException se) {
61-
LOG.warn("Unhandled exception: " + se.toString());
62-
return KafkaPrincipal.ANONYMOUS;
69+
return sans.stream()
70+
.map(san -> (String) san.get(1))
71+
.filter(uri -> uri.startsWith("spiffe://"))
72+
.findFirst()
73+
.orElse(null);
74+
} catch (CertificateParsingException e) {
75+
LOG.warn("failed to parse SAN", e);
76+
return null;
6377
}
6478
}
6579
}
Lines changed: 56 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -1,162 +1,87 @@
11
package io.okro.kafka;
22

3-
import java.util.concurrent.TimeUnit;
3+
import org.apache.kafka.common.security.auth.KafkaPrincipal;
4+
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
5+
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
6+
import org.junit.Test;
47

5-
import java.io.ByteArrayInputStream;
8+
import javax.net.ssl.SSLPeerUnverifiedException;
9+
import javax.net.ssl.SSLSession;
10+
import java.io.InputStream;
11+
import java.net.InetAddress;
12+
import java.net.UnknownHostException;
613
import java.security.cert.Certificate;
14+
import java.security.cert.CertificateException;
715
import java.security.cert.CertificateFactory;
816
import java.security.cert.X509Certificate;
9-
import java.net.InetAddress;
10-
import javax.net.ssl.SSLSession;
11-
12-
import org.apache.kafka.common.security.auth.*;
1317

14-
import org.apache.commons.io.IOUtils;
15-
16-
import org.easymock.EasyMock;
17-
import org.easymock.EasyMockSupport;
18-
import org.junit.Test;
1918
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertNotNull;
21-
22-
public class SpiffePrincipalBuilderTest extends EasyMockSupport {
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.when;
2321

24-
private X509Certificate getResourceAsCert(String resourcePath)
25-
throws java.io.IOException, java.security.cert.CertificateException {
22+
public class SpiffePrincipalBuilderTest {
2623

24+
private SslAuthenticationContext mockedSslContext(String certPath) throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
25+
// load cert
2726
ClassLoader classLoader = getClass().getClassLoader();
28-
try {
29-
// Read cert
30-
ByteArrayInputStream certInputStream =
31-
new ByteArrayInputStream(IOUtils.toByteArray(classLoader.getResourceAsStream(resourcePath)));
32-
33-
// Parse as X509 certificate
34-
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
35-
return (X509Certificate) certificateFactory.generateCertificate(certInputStream);
36-
37-
} catch (java.io.IOException | java.security.cert.CertificateException e) {
38-
System.out.println("Problem with reading the certificate file. " + e.toString());
39-
throw e;
40-
}
27+
InputStream in = classLoader.getResourceAsStream(certPath);
28+
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
29+
X509Certificate cert = (X509Certificate) certificateFactory.generateCertificate(in);
30+
31+
// mock ssl session
32+
SSLSession session = mock(SSLSession.class);
33+
when(session.getPeerCertificates()).thenReturn(new Certificate[]{cert});
34+
return new SslAuthenticationContext(session, InetAddress.getLocalHost());
4135
}
4236

37+
/**
38+
* X509 V3 with a SPIFFE-based SAN extension.
39+
* Should result in 'SPIFFE:[spiffe://uri]'
40+
*/
4341
@Test
44-
public void TestSubjectOnlyCert() {
45-
// Tests an X509 V1 certificate with no SAN extension
42+
public void TestSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
43+
SslAuthenticationContext context = mockedSslContext("spiffe-cert.pem");
44+
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
4645

47-
try {
48-
X509Certificate cert = getResourceAsCert("subject-only-cert.pem");
49-
50-
// Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
51-
SSLSession session = mock(SSLSession.class);
52-
EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});
53-
54-
replayAll();
55-
56-
// Build KafkaPrincipal
57-
SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();
58-
59-
KafkaPrincipal principal = builder.build(
60-
new SslAuthenticationContext(session, InetAddress.getLocalHost()));
61-
62-
// Identity type should be "User"
63-
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
64-
65-
// Identity should be a string
66-
assertNotNull(principal.getName());
67-
68-
System.out.println("Principal: " + principal.toString());
69-
70-
} catch (java.io.IOException | java.security.cert.CertificateException e) {
71-
System.out.println("Problem with reading the certificate file. " + e.toString());
72-
}
46+
assertEquals("SPIFFE", principal.getPrincipalType());
47+
assertEquals(principal.getName(), "spiffe://srv1.okro.io");
7348
}
7449

50+
/**
51+
* X509 V1 certificate with no SAN extension.
52+
* Should fall back to 'User:CN=[CN]'
53+
*/
7554
@Test
76-
public void TestSpiffeCert() {
77-
// Tests an X509 V3 with SAN extension holding a SPIFFE ID
78-
79-
try {
80-
X509Certificate cert = getResourceAsCert("spiffe-cert.pem");
81-
82-
// Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
83-
SSLSession session = mock(SSLSession.class);
84-
EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});
55+
public void TestSubjectOnlyCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
56+
SslAuthenticationContext context = mockedSslContext("subject-only-cert.pem");
57+
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
8558

86-
replayAll();
87-
88-
// Build KafkaPrincipal
89-
SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();
90-
91-
KafkaPrincipal principal = builder.build(
92-
new SslAuthenticationContext(session, InetAddress.getLocalHost()));
93-
94-
// Identity type should be "SPIFFE"
95-
assertEquals("SPIFFE", principal.getPrincipalType());
96-
97-
// Identity should be a string
98-
assertNotNull(principal.getName());
99-
100-
System.out.println("Principal: " + principal.toString());
101-
102-
} catch (java.io.IOException | java.security.cert.CertificateException e) {
103-
System.out.println("Problem with reading the certificate file. " + e.toString());
104-
}
59+
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
60+
assertEquals(principal.getName(), "CN=srv2,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
10561
}
10662

63+
/**
64+
* X509 V3 with a non-SPIFFE SAN extension.
65+
* Should fall back to 'User:CN=[CN]'
66+
*/
10767
@Test
108-
public void TestSanNoSpiffeCert() {
109-
// Tests an X509 V3 with SAN extension holding a regular FQDN
110-
111-
try {
112-
X509Certificate cert = getResourceAsCert("san-no-spiffe-cert.pem");
113-
114-
// Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
115-
SSLSession session = mock(SSLSession.class);
116-
EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});
117-
118-
replayAll();
119-
120-
// Build KafkaPrincipal
121-
SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();
122-
123-
KafkaPrincipal principal = builder.build(
124-
new SslAuthenticationContext(session, InetAddress.getLocalHost()));
68+
public void TestSanNoSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
69+
SslAuthenticationContext context = mockedSslContext("san-no-spiffe-cert.pem");
70+
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
12571

126-
// Identity type should be "User"
127-
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
128-
129-
// Identity should be a string
130-
assertNotNull(principal.getName());
131-
132-
System.out.println("Principal: " + principal.toString());
133-
134-
} catch (java.io.IOException | java.security.cert.CertificateException e) {
135-
System.out.println("Problem with reading the certificate file. " + e.toString());
136-
}
72+
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
73+
assertEquals(principal.getName(), "CN=srv3,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
13774
}
13875

76+
/**
77+
* Non-SSL context.
78+
* Should be unauthenticated.
79+
*/
13980
@Test
14081
public void TestNoSSLContext() throws java.net.UnknownHostException {
141-
// Tests non-SSL context behavior
142-
143-
SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();
144-
145-
KafkaPrincipal principal = builder.build(
146-
new PlaintextAuthenticationContext(InetAddress.getLocalHost()));
82+
PlaintextAuthenticationContext context = new PlaintextAuthenticationContext(InetAddress.getLocalHost());
83+
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
14784

148-
// Identity type should be KafkaPrincipal.ANONYMOUS
14985
assertEquals(KafkaPrincipal.ANONYMOUS, principal);
150-
151-
System.out.println("Principal: " + principal.toString());
152-
}
153-
154-
@Test
155-
public void TestAwareness() throws InterruptedException {
156-
// Tests a reviewer's awareness
157-
TimeUnit.SECONDS.sleep(1);
158-
159-
// Identity type should be KafkaPrincipal.ANONYMOUS
160-
assertEquals(42, 42);
16186
}
16287
}

0 commit comments

Comments
 (0)