Skip to content

Commit ac7fae8

Browse files
authored
Speeding up compact adapter creation by replacing findAll with getting the specific adapter (apache#3791)
* minor fix for speeding up the creation of compact adapters * added logs * clean up * mavn runs
1 parent a3e6873 commit ac7fae8

File tree

1 file changed

+16
-25
lines changed

1 file changed

+16
-25
lines changed

streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818

1919
package org.apache.streampipes.connect.management.management;
20-
2120
import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
2221
import org.apache.streampipes.commons.exceptions.SepaParseException;
2322
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
@@ -41,7 +40,8 @@
4140
import java.util.NoSuchElementException;
4241

4342
/**
44-
* This class is responsible for managing all the adapter instances which are executed on worker nodes
43+
* This class is responsible for managing all the adapter instances which are
44+
* executed on worker nodes
4545
*/
4646
public class AdapterMasterManagement {
4747

@@ -57,8 +57,7 @@ public AdapterMasterManagement(
5757
IAdapterStorage adapterInstanceStorage,
5858
AdapterResourceManager adapterResourceManager,
5959
DataStreamResourceManager dataStreamResourceManager,
60-
AdapterMetrics adapterMetrics
61-
) {
60+
AdapterMetrics adapterMetrics) {
6261
this.adapterInstanceStorage = adapterInstanceStorage;
6362
this.adapterMetrics = adapterMetrics;
6463
this.adapterResourceManager = adapterResourceManager;
@@ -68,8 +67,7 @@ public AdapterMasterManagement(
6867
public void addAdapter(
6968
AdapterDescription adapterDescription,
7069
String adapterId,
71-
String principalSid
72-
)
70+
String principalSid)
7371
throws AdapterException {
7472

7573
// Create elementId for datastream
@@ -92,8 +90,7 @@ private void createDataStreamForAdapter(
9290
AdapterDescription adapterDescription,
9391
String adapterId,
9492
String streamId,
95-
String principalSid
96-
) throws AdapterException {
93+
String principalSid) throws AdapterException {
9794
var storedDescription = new SourcesManagement()
9895
.createAdapterDataStream(adapterDescription, streamId);
9996
storedDescription.setCorrespondingAdapterId(adapterId);
@@ -102,21 +99,16 @@ private void createDataStreamForAdapter(
10299
}
103100

104101
public AdapterDescription getAdapter(String elementId) throws AdapterException {
105-
List<AdapterDescription> allAdapters = adapterInstanceStorage.findAll();
106-
107-
if (allAdapters != null && elementId != null) {
108-
for (AdapterDescription ad : allAdapters) {
109-
if (elementId.equals(ad.getElementId())) {
110-
return ad;
111-
}
112-
}
102+
AdapterDescription adapter = adapterInstanceStorage.getElementById(elementId);
103+
if (adapter == null) {
104+
throw new AdapterException("Adapter with ID " + elementId + " not found");
113105
}
114-
115-
throw new AdapterException("Could not find adapter with id: " + elementId);
106+
return adapter;
116107
}
117108

118109
/**
119-
* First the adapter is stopped removed, then the corresponding data source is deleted
110+
* First the adapter is stopped removed, then the corresponding data source is
111+
* deleted
120112
*
121113
* @param elementId The elementId of the adapter instance
122114
* @throws AdapterException when adapter can not be stopped
@@ -146,7 +138,7 @@ public List<AdapterDescription> getAllAdapterInstances() {
146138
}
147139

148140
public void stopStreamAdapter(String elementId,
149-
boolean forceStop) throws AdapterException {
141+
boolean forceStop) throws AdapterException {
150142
AdapterDescription ad = adapterInstanceStorage.getElementById(elementId);
151143

152144
try {
@@ -181,8 +173,7 @@ public void startStreamAdapter(String elementId) throws AdapterException {
181173
ad.getAppId(),
182174
SpServiceUrlProvider.ADAPTER,
183175
ad.getDeploymentConfiguration()
184-
.getDesiredServiceTags()
185-
);
176+
.getDesiredServiceTags());
186177

187178
// Update selected endpoint URL of adapter
188179
ad.setSelectedEndpointUrl(baseUrl);
@@ -191,7 +182,8 @@ public void startStreamAdapter(String elementId) throws AdapterException {
191182
// Invoke adapter instance
192183
WorkerRestClient.invokeStreamAdapter(baseUrl, elementId);
193184

194-
// register the adapter at the metrics manager so that the AdapterHealthCheck can send metrics
185+
// register the adapter at the metrics manager so that the AdapterHealthCheck
186+
// can send metrics
195187
adapterMetrics.register(ad.getElementId(), ad.getName());
196188

197189
LOG.info("Started adapter " + elementId + " on: " + baseUrl);
@@ -202,8 +194,7 @@ public void startStreamAdapter(String elementId) throws AdapterException {
202194

203195
private void installDataSource(
204196
SpDataStream stream,
205-
String principalSid
206-
) throws AdapterException {
197+
String principalSid) throws AdapterException {
207198
try {
208199
new DataStreamVerifier(stream).verifyAndAdd(principalSid, false);
209200
} catch (SepaParseException e) {

0 commit comments

Comments
 (0)