Skip to content

Commit 38881c8

Browse files
feat(apache#3822): Support OPC-UA certificate authentication (apache#3823)
1 parent 828b1b1 commit 38881c8

File tree

13 files changed

+316
-26
lines changed

13 files changed

+316
-26
lines changed

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
3030
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV4;
3131
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV5;
32+
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV6;
3233
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaSinkMigrationV1;
34+
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaSinkMigrationV2;
3335
import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
3436

3537
import java.util.List;
@@ -64,7 +66,9 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
6466
new OpcUaAdapterMigrationV3(),
6567
new OpcUaAdapterMigrationV4(),
6668
new OpcUaAdapterMigrationV5(),
67-
new OpcUaSinkMigrationV1()
69+
new OpcUaAdapterMigrationV6(),
70+
new OpcUaSinkMigrationV1(),
71+
new OpcUaSinkMigrationV2()
6872
);
6973
}
7074
}

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public StaticProperty resolveConfiguration(String staticPropertyInternalName,
237237

238238
@Override
239239
public IAdapterConfiguration declareConfig() {
240-
var builder = AdapterConfigurationBuilder.create(ID, 5, () -> new OpcUaAdapter(clientProvider))
240+
var builder = AdapterConfigurationBuilder.create(ID, 6, () -> new OpcUaAdapter(clientProvider))
241241
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
242242
.withLocales(Locales.EN)
243243
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public class SharedUserConfiguration {
5656
public static final String SECURITY_POLICY = "securityPolicy";
5757
public static final String USER_AUTHENTICATION = "userAuthentication";
5858
public static final String USER_AUTHENTICATION_ANONYMOUS = "anonymous";
59+
public static final String X509_GROUP = "x509Group";
60+
public static final String X509_PRIVATE_KEY_PEM = "x509PrivateKeyPem";
61+
public static final String X509_PUBLIC_KEY_PEM = "x509PublicKeyPem";
5962

6063
public static OneOfStaticProperty makeNamingStrategyOption() {
6164
return StaticProperties.singleValueSelection(
@@ -72,6 +75,13 @@ public static void appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBu
7275

7376
var dependsOn = getDependsOn(adapterConfig);
7477

78+
var x509Group = StaticProperties.group(
79+
Labels.withId(X509_GROUP),
80+
StaticProperties.secretValue(Labels.withId(X509_PRIVATE_KEY_PEM)),
81+
StaticProperties.stringFreeTextProperty(Labels.withId(X509_PUBLIC_KEY_PEM), true, false));
82+
83+
x509Group.setHorizontalRendering(false);
84+
7585
builder
7686
.requiredSingleValueSelection(
7787
Labels.withId(SECURITY_MODE),
@@ -89,7 +99,8 @@ public static void appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBu
8999
StaticProperties.stringFreeTextProperty(
90100
Labels.withId(USERNAME)),
91101
StaticProperties.secretValue(Labels.withId(PASSWORD))
92-
))
102+
)),
103+
Alternatives.from(Labels.withId(X509_GROUP), x509Group)
93104
)
94105
.requiredAlternatives(Labels.withId(OPC_HOST_OR_URL),
95106
Alternatives.from(

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
2424
import org.apache.streampipes.extensions.connectors.opcua.config.identity.AnonymousIdentityConfig;
2525
import org.apache.streampipes.extensions.connectors.opcua.config.identity.UsernamePasswordIdentityConfig;
26+
import org.apache.streampipes.extensions.connectors.opcua.config.identity.X509IdentityConfig;
2627
import org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
2728
import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels;
2829
import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaNamingStrategy;
@@ -44,6 +45,7 @@
4445
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
4546
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
4647
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
48+
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
4749

4850
public class SpOpcUaConfigExtractor {
4951

@@ -128,15 +130,16 @@ public static <T extends OpcUaConfig> T extractSharedConfig(IParameterExtractor
128130
config.setOpcServerURL(serverAddress + ":" + port);
129131
}
130132

131-
boolean unauthenticated = selectedAlternativeAuthentication.equals(
132-
SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS
133-
);
134-
if (unauthenticated) {
133+
if (selectedAlternativeAuthentication.equals(SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS)) {
135134
config.setIdentityConfig(new AnonymousIdentityConfig());
136-
} else {
135+
} else if (selectedAlternativeAuthentication.equals(USERNAME_GROUP.name())) {
137136
String username = extractor.singleValueParameter(USERNAME.name(), String.class);
138137
String password = extractor.secretValue(PASSWORD.name());
139138
config.setIdentityConfig(new UsernamePasswordIdentityConfig(username, password));
139+
} else {
140+
String privateKeyPem = extractor.secretValue(SharedUserConfiguration.X509_PRIVATE_KEY_PEM);
141+
String publicKeyPem = extractor.textParameter(SharedUserConfiguration.X509_PUBLIC_KEY_PEM);
142+
config.setIdentityConfig(new X509IdentityConfig(publicKeyPem, privateKeyPem));
140143
}
141144

142145
return config;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.extensions.connectors.opcua.config.identity;
20+
21+
import org.apache.commons.codec.digest.DigestUtils;
22+
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
23+
import org.eclipse.milo.opcua.sdk.client.api.identity.X509IdentityProvider;
24+
25+
import java.io.ByteArrayInputStream;
26+
import java.nio.charset.StandardCharsets;
27+
import java.security.KeyFactory;
28+
import java.security.PrivateKey;
29+
import java.security.cert.CertificateFactory;
30+
import java.security.cert.X509Certificate;
31+
import java.security.spec.PKCS8EncodedKeySpec;
32+
import java.util.Base64;
33+
import java.util.Objects;
34+
35+
public class X509IdentityConfig implements IdentityConfig {
36+
37+
private final X509Certificate certificate;
38+
private final PrivateKey privateKey;
39+
40+
/**
41+
* @param certificatePem String containing one X.509 certificate in PEM
42+
* (-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----)
43+
* @param privateKeyPem String containing a PKCS#8 private key in PEM
44+
* (-----BEGIN PRIVATE KEY----- ... -----END PRIVATE KEY-----)
45+
*/
46+
public X509IdentityConfig(String certificatePem, String privateKeyPem) {
47+
this.certificate = parseCertificatePem(certificatePem);
48+
this.privateKey = parsePrivateKeyPem(privateKeyPem);
49+
}
50+
51+
@Override
52+
public void configureIdentity(OpcUaClientConfigBuilder builder) {
53+
builder.setIdentityProvider(new X509IdentityProvider(certificate, privateKey));
54+
}
55+
56+
@Override
57+
public String toString() {
58+
try {
59+
String subject = certificate.getSubjectX500Principal().getName();
60+
String thumb = DigestUtils.sha256Hex(certificate.getEncoded());
61+
return String.format("%s [%s]", subject, thumb);
62+
} catch (Exception e) {
63+
return "X509PemIdentity";
64+
}
65+
}
66+
67+
private X509Certificate parseCertificatePem(String pem) {
68+
Objects.requireNonNull(pem, "certificatePem");
69+
byte[] der = extractPemBlock(pem, "CERTIFICATE");
70+
try {
71+
CertificateFactory cf = CertificateFactory.getInstance("X.509");
72+
return (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(der));
73+
} catch (Exception e) {
74+
throw new IllegalArgumentException("Failed to parse X.509 certificate PEM", e);
75+
}
76+
}
77+
78+
private PrivateKey parsePrivateKeyPem(String pem) {
79+
Objects.requireNonNull(pem, "privateKeyPem");
80+
byte[] der = extractPemBlock(pem, "PRIVATE KEY"); // PKCS#8
81+
PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(der);
82+
83+
// Try common algorithms in order.
84+
String[] algos = new String[] { "RSA", "EC", "Ed25519", "Ed448" };
85+
for (String algo : algos) {
86+
try {
87+
KeyFactory kf = KeyFactory.getInstance(algo);
88+
return kf.generatePrivate(spec);
89+
} catch (Exception ignore) {
90+
// try next
91+
}
92+
}
93+
throw new IllegalArgumentException(
94+
"Unsupported or invalid PKCS#8 private key. " +
95+
"Make sure it is an unencrypted PKCS#8 key (BEGIN PRIVATE KEY).");
96+
}
97+
98+
private static byte[] extractPemBlock(String pem, String type) {
99+
String begin = "-----BEGIN " + type + "-----";
100+
String end = "-----END " + type + "-----";
101+
102+
String normalized = pem.replace("\r", "");
103+
int start = normalized.indexOf(begin);
104+
int stop = normalized.indexOf(end);
105+
if (start < 0 || stop < 0) {
106+
throw new IllegalArgumentException("Missing PEM markers for " + type);
107+
}
108+
String base64 = normalized.substring(start + begin.length(), stop)
109+
.replace("\n", "")
110+
.replace("\t", "")
111+
.replace(" ", "");
112+
return Base64.getMimeDecoder().decode(base64.getBytes(StandardCharsets.US_ASCII));
113+
}
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.extensions.connectors.opcua.migration;
20+
21+
import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
22+
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
23+
import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
24+
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
25+
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
26+
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
27+
import org.apache.streampipes.model.migration.MigrationResult;
28+
import org.apache.streampipes.model.migration.ModelMigratorConfig;
29+
import org.apache.streampipes.model.staticproperty.StaticProperty;
30+
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
31+
import org.apache.streampipes.sdk.StaticProperties;
32+
import org.apache.streampipes.sdk.helpers.Alternatives;
33+
import org.apache.streampipes.sdk.helpers.Labels;
34+
35+
import java.util.List;
36+
37+
import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.X509_GROUP;
38+
import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.X509_PRIVATE_KEY_PEM;
39+
import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.X509_PUBLIC_KEY_PEM;
40+
41+
public class OpcUaAdapterMigrationV6 implements IAdapterMigrator {
42+
@Override
43+
public ModelMigratorConfig config() {
44+
return new ModelMigratorConfig(
45+
OpcUaAdapter.ID,
46+
SpServiceTagPrefix.ADAPTER,
47+
5,
48+
6
49+
);
50+
}
51+
52+
@Override
53+
public MigrationResult<AdapterDescription> migrate(AdapterDescription element,
54+
IStaticPropertyExtractor extractor) throws RuntimeException {
55+
var config = element.getConfig();
56+
element.setConfig(migrate(config, 4));
57+
58+
return MigrationResult.success(element);
59+
}
60+
61+
public List<StaticProperty> migrate(List<StaticProperty> staticProperties,
62+
int authenticationConfigIndex) {
63+
var authentication = staticProperties.get(authenticationConfigIndex);
64+
65+
if (authentication instanceof StaticPropertyAlternatives) {
66+
var group = StaticProperties.group(
67+
Labels.withId(X509_GROUP),
68+
StaticProperties.secretValue(Labels.withId(X509_PRIVATE_KEY_PEM)),
69+
StaticProperties.stringFreeTextProperty(Labels.withId(X509_PUBLIC_KEY_PEM), true, false));
70+
group.setHorizontalRendering(false);
71+
var x509Alternative = Alternatives.from(Labels.withId(X509_GROUP), group);
72+
((StaticPropertyAlternatives) authentication).getAlternatives().add(x509Alternative);
73+
}
74+
return staticProperties;
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.extensions.connectors.opcua.migration;
20+
21+
import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
22+
import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
23+
import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
24+
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
25+
import org.apache.streampipes.model.graph.DataSinkInvocation;
26+
import org.apache.streampipes.model.migration.MigrationResult;
27+
import org.apache.streampipes.model.migration.ModelMigratorConfig;
28+
29+
public class OpcUaSinkMigrationV2 implements IDataSinkMigrator {
30+
@Override
31+
public ModelMigratorConfig config() {
32+
return new ModelMigratorConfig(
33+
OpcUaSink.ID,
34+
SpServiceTagPrefix.DATA_SINK,
35+
1,
36+
2
37+
);
38+
}
39+
40+
@Override
41+
public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element,
42+
IDataSinkParameterExtractor extractor) throws RuntimeException {
43+
var config = element.getStaticProperties();
44+
var migratedConfigs = new OpcUaAdapterMigrationV6().migrate(config, 3);
45+
element.setStaticProperties(migratedConfigs);
46+
return MigrationResult.success(element);
47+
}
48+
}

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public OpcUaSink(OpcUaClientProvider clientProvider) {
5757

5858
@Override
5959
public IDataSinkConfiguration declareConfig() {
60-
var builder = DataSinkBuilder.create(ID, 0)
60+
var builder = DataSinkBuilder.create(ID, 2)
6161
.withLocales(Locales.EN)
6262
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
6363
.category(DataSinkType.FORWARD)

streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,12 @@ incomplete-event-handling.description=Select how events with missing values (e.g
8585

8686
NAMING_STRATEGY.title=Naming Strategy
8787
NAMING_STRATEGY.description=Select how the runtime name of the OPC-UA nodes are generated.
88+
89+
x509Group.title=X.509 Certificate
90+
x509Group.description=
91+
92+
x509PrivateKeyPem.title=Private Key PEM
93+
x509PrivateKeyPem.description=Private key in PEM format
94+
95+
x509PublicKeyPem.title=Certificate PEM
96+
x509PublicKeyPem.description=Public key in PEM format

streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,12 @@ userAuthentication.description=Choose an authentication method for the user
6868

6969
anonymous.title=Anonymous
7070
anonymous.description=
71+
72+
x509Group.title=X.509 Certificate
73+
x509Group.description=
74+
75+
x509PrivateKeyPem.title=Private Key PEM
76+
x509PrivateKeyPem.description=Private key in PEM format
77+
78+
x509PublicKeyPem.title=Certificate PEM
79+
x509PublicKeyPem.description=Public key in PEM format

0 commit comments

Comments
 (0)