Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -61,20 +62,26 @@ public abstract class AbstractDiscoveryService implements DiscoveryService {
protected final ScheduledExecutorService scheduler = ThreadPoolManager.getScheduledPool(DISCOVERY_THREADPOOL_NAME);

private final Set<DiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<>();
protected @Nullable ScanListener scanListener = null;

private boolean backgroundDiscoveryEnabled;
// All access must be guarded by "this"
protected @Nullable ScanListener scanListener;

private @Nullable String scanInputLabel;
private @Nullable String scanInputDescription;
private volatile boolean backgroundDiscoveryEnabled;

private final @Nullable String scanInputLabel;
private final @Nullable String scanInputDescription;

// All access must be guarded by "cachedResults"
private final Map<ThingUID, DiscoveryResult> cachedResults = new HashMap<>();

// This set is immutable and can safely be shared between threads
private final Set<ThingTypeUID> supportedThingTypes;
private final int timeout;

// All access must be guarded by "this"
private Instant timestampOfLastScan = Instant.MIN;

// All access must be guarded by "this"
private @Nullable ScheduledFuture<?> scheduledStop;

protected @NonNullByDefault({}) TranslationProvider i18nProvider;
Expand Down Expand Up @@ -198,12 +205,14 @@ public void addDiscoveryListener(@Nullable DiscoveryListener listener) {
if (listener == null) {
return;
}
discoveryListeners.add(listener);
Map<ThingUID, DiscoveryResult> existingResults;
synchronized (cachedResults) {
for (DiscoveryResult cachedResult : cachedResults.values()) {
listener.thingDiscovered(this, cachedResult);
}
existingResults = Map.copyOf(cachedResults);
}
for (DiscoveryResult existingResult : existingResults.values()) {
listener.thingDiscovered(this, existingResult);
}
discoveryListeners.add(listener);
}

@Override
Expand All @@ -221,11 +230,10 @@ public void startScan(String input, @Nullable ScanListener listener) {
startScanInternal(input, listener);
}

private synchronized void startScanInternal(@Nullable String input, @Nullable ScanListener listener) {
private void startScanInternal(@Nullable String input, @Nullable ScanListener listener) {
// we first stop any currently running scan and its scheduled stop call
stopScan();
synchronized (this) {
// we first stop any currently running scan and its scheduled stop
// call
stopScan();
ScheduledFuture<?> scheduledStop = this.scheduledStop;
if (scheduledStop != null) {
scheduledStop.cancel(false);
Expand All @@ -245,39 +253,41 @@ private synchronized void startScanInternal(@Nullable String input, @Nullable Sc
}, getScanTimeout(), TimeUnit.SECONDS);
}
timestampOfLastScan = Instant.now();

try {
if (isScanInputSupported() && input != null) {
startScan(input);
} else {
startScan();
}
} catch (Exception ex) {
scheduledStop = this.scheduledStop;
}
try {
if (isScanInputSupported() && input != null) {
startScan(input);
} else {
startScan();
}
} catch (Exception ex) {
synchronized (this) {
ScheduledFuture<?> scheduledStop = this.scheduledStop;
if (scheduledStop != null) {
scheduledStop.cancel(false);
this.scheduledStop = null;
}
scanListener = null;
throw ex;
}
throw ex;
}
}

@Override
public synchronized void abortScan() {
public void abortScan() {
ScanListener scanListener = null;
synchronized (this) {
ScheduledFuture<?> scheduledStop = this.scheduledStop;
if (scheduledStop != null) {
scheduledStop.cancel(false);
scheduledStop.cancel(true);
this.scheduledStop = null;
}
final ScanListener scanListener = this.scanListener;
if (scanListener != null) {
Exception e = new CancellationException("Scan has been aborted.");
scanListener.onErrorOccurred(e);
this.scanListener = null;
}
scanListener = this.scanListener;
this.scanListener = null;
}
if (scanListener != null) {
Exception e = new CancellationException("Scan has been aborted.");
scanListener.onErrorOccurred(e);
}
}

Expand All @@ -297,11 +307,14 @@ protected void startScan(String input) {
/**
* This method cleans up after a scan, i.e. it removes listeners and other required operations.
*/
protected synchronized void stopScan() {
ScanListener scanListener = this.scanListener;
protected void stopScan() {
ScanListener scanListener = null;
synchronized (this) {
scanListener = this.scanListener;
this.scanListener = null;
}
if (scanListener != null) {
scanListener.onFinished();
this.scanListener = null;
}
}

Expand All @@ -314,12 +327,14 @@ protected void thingDiscovered(final DiscoveryResult discoveryResult) {
final DiscoveryResult discoveryResultNew = getLocalizedDiscoveryResult(discoveryResult,
FrameworkUtil.getBundle(this.getClass()));
for (DiscoveryListener discoveryListener : discoveryListeners) {
try {
discoveryListener.thingDiscovered(this, discoveryResultNew);
} catch (Exception e) {
logger.error("An error occurred while calling the discovery listener {}.",
discoveryListener.getClass().getName(), e);
}
scheduler.execute(() -> {
try {
discoveryListener.thingDiscovered(this, discoveryResultNew);
} catch (Exception e) {
logger.error("An error occurred while calling the discovery listener {}.",
discoveryListener.getClass().getName(), e);
}
});
}
synchronized (cachedResults) {
cachedResults.put(discoveryResultNew.getThingUID(), discoveryResultNew);
Expand Down Expand Up @@ -384,18 +399,22 @@ protected void removeOlderResults(Instant timestamp, @Nullable ThingUID bridgeUI
*/
protected void removeOlderResults(Instant timestamp, @Nullable Collection<ThingTypeUID> thingTypeUIDs,
@Nullable ThingUID bridgeUID) {
Collection<ThingUID> removedThings = null;
Set<ThingUID> removedThings = new HashSet<>();

Collection<ThingUID> removed;
Collection<ThingTypeUID> toBeRemoved = thingTypeUIDs != null ? thingTypeUIDs : getSupportedThingTypes();
for (DiscoveryListener discoveryListener : discoveryListeners) {
try {
removedThings = discoveryListener.removeOlderResults(this, timestamp, toBeRemoved, bridgeUID);
removed = discoveryListener.removeOlderResults(this, timestamp, toBeRemoved, bridgeUID);
if (removed != null) {
removedThings.addAll(removed);
}
} catch (Exception e) {
logger.error("An error occurred while calling the discovery listener {}.",
discoveryListener.getClass().getName(), e);
}
}
if (removedThings != null) {
if (!removedThings.isEmpty()) {
synchronized (cachedResults) {
for (ThingUID uid : removedThings) {
cachedResults.remove(uid);
Expand Down Expand Up @@ -486,7 +505,7 @@ protected void stopBackgroundDiscovery() {
*
* @return timestamp as {@link Instant}
*/
protected Instant getTimestampOfLastScan() {
protected synchronized Instant getTimestampOfLastScan() {
return timestampOfLastScan;
}

Expand All @@ -496,6 +515,8 @@ private String inferKey(DiscoveryResult discoveryResult, String lastSegment) {

protected DiscoveryResult getLocalizedDiscoveryResult(final DiscoveryResult discoveryResult,
@Nullable Bundle bundle) {
TranslationProvider i18nProvider = this.i18nProvider;
LocaleProvider localeProvider = this.localeProvider;
if (i18nProvider != null && localeProvider != null) {
String currentLabel = discoveryResult.getLabel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,20 @@
@NonNullByDefault
public final class DiscoveryServiceRegistryImpl implements DiscoveryServiceRegistry, DiscoveryListener {

// All access must be guarded by "cachedResults"
private final Map<DiscoveryService, Set<DiscoveryResult>> cachedResults = new HashMap<>();

private final class AggregatingScanListener implements ScanListener {

private final @Nullable ScanListener listener;

// All access must be guarded by "this"
private int finishedDiscoveryServices = 0;

// All access must be guarded by "this"
private boolean errorOccurred = false;

// All access must be guarded by "this"
private int numberOfDiscoveryServices;

private AggregatingScanListener(int numberOfDiscoveryServices, @Nullable ScanListener listener) {
Expand All @@ -81,28 +88,27 @@ private AggregatingScanListener(int numberOfDiscoveryServices, @Nullable ScanLis
}

@Override
public synchronized void onFinished() {
public void onFinished() {
ScanListener listener = null;
synchronized (this) {
finishedDiscoveryServices++;
logger.debug("Finished {} of {} discovery services.", finishedDiscoveryServices,
numberOfDiscoveryServices);
if (!errorOccurred && finishedDiscoveryServices == numberOfDiscoveryServices) {
ScanListener listener = this.listener;
if (listener != null) {
listener.onFinished();
}
listener = this.listener;
}
}
if (listener != null) {
listener.onFinished();
}
}

@Override
public void onErrorOccurred(@Nullable Exception exception) {
ScanListener listener = null;
synchronized (this) {
if (!errorOccurred) {
ScanListener listener = this.listener;
if (listener != null) {
listener.onErrorOccurred(exception);
}
listener = this.listener;
errorOccurred = true;
} else {
// Skip error logging for aborted scans
Expand All @@ -114,23 +120,27 @@ public void onErrorOccurred(@Nullable Exception exception) {
}
}
}
if (listener != null) {
listener.onErrorOccurred(exception);
}
}

public void reduceNumberOfDiscoveryServices() {
ScanListener listener = null;
synchronized (this) {
numberOfDiscoveryServices--;
if (!errorOccurred && finishedDiscoveryServices == numberOfDiscoveryServices) {
ScanListener listener = this.listener;
if (listener != null) {
listener.onFinished();
}
listener = this.listener;
}
}
if (listener != null) {
listener.onFinished();
}
}
}

private final Set<DiscoveryService> discoveryServices = new CopyOnWriteArraySet<>();
private final Set<DiscoveryService> discoveryServicesAll = new HashSet<>();
private final Set<DiscoveryService> discoveryServicesAll = new CopyOnWriteArraySet<>();

private final Set<DiscoveryListener> listeners = new CopyOnWriteArraySet<>();

Expand All @@ -153,7 +163,9 @@ protected void deactivate() {
removeDiscoveryServiceActivated(discoveryService);
}
listeners.clear();
cachedResults.clear();
synchronized (cachedResults) {
cachedResults.clear();
}
}

@Override
Expand Down Expand Up @@ -182,12 +194,14 @@ public boolean abortScan(String bindingId) throws IllegalStateException {

@Override
public void addDiscoveryListener(DiscoveryListener listener) throws IllegalStateException {
listeners.add(listener);
Map<DiscoveryService, Set<DiscoveryResult>> existingResults;
synchronized (cachedResults) {
cachedResults.forEach((service, results) -> {
results.forEach(result -> listener.thingDiscovered(service, result));
});
existingResults = Map.copyOf(cachedResults);
}
listeners.add(listener);
existingResults.forEach((service, results) -> {
results.forEach(result -> listener.thingDiscovered(service, result));
});
}

@Override
Expand Down Expand Up @@ -248,12 +262,12 @@ public List<String> getSupportedBindings() {
}

@Override
public synchronized void removeDiscoveryListener(DiscoveryListener listener) throws IllegalStateException {
public void removeDiscoveryListener(DiscoveryListener listener) throws IllegalStateException {
listeners.remove(listener);
}

@Override
public synchronized void thingDiscovered(final DiscoveryService source, final DiscoveryResult result) {
public void thingDiscovered(final DiscoveryService source, final DiscoveryResult result) {
synchronized (cachedResults) {
Objects.requireNonNull(cachedResults.computeIfAbsent(source, unused -> new HashSet<>())).add(result);
}
Expand All @@ -268,7 +282,7 @@ public synchronized void thingDiscovered(final DiscoveryService source, final Di
}

@Override
public synchronized void thingRemoved(final DiscoveryService source, final ThingUID thingUID) {
public void thingRemoved(final DiscoveryService source, final ThingUID thingUID) {
synchronized (cachedResults) {
Iterator<DiscoveryResult> it = cachedResults.getOrDefault(source, Set.of()).iterator();
while (it.hasNext()) {
Expand Down Expand Up @@ -376,8 +390,7 @@ private boolean startScan(DiscoveryService discoveryService, @Nullable String in
}
}

private synchronized Set<DiscoveryService> getDiscoveryServices(ThingTypeUID thingTypeUID)
throws IllegalStateException {
private Set<DiscoveryService> getDiscoveryServices(ThingTypeUID thingTypeUID) throws IllegalStateException {
Set<DiscoveryService> discoveryServices = new HashSet<>();

for (DiscoveryService discoveryService : this.discoveryServices) {
Expand All @@ -391,7 +404,7 @@ private synchronized Set<DiscoveryService> getDiscoveryServices(ThingTypeUID thi
}

@Override
public synchronized Set<DiscoveryService> getDiscoveryServices(String bindingId) throws IllegalStateException {
public Set<DiscoveryService> getDiscoveryServices(String bindingId) throws IllegalStateException {
Set<DiscoveryService> discoveryServices = new HashSet<>();

for (DiscoveryService discoveryService : this.discoveryServices) {
Expand Down