Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -168,7 +169,7 @@ public class RuleEngineImpl implements RuleManager, RegistryChangeListener<Modul
/**
* flag to check whether we have reached a start level where we want to start rule execution
*/
private boolean started = false;
private volatile boolean started = false;

protected final Logger logger = LoggerFactory.getLogger(RuleEngineImpl.class);

Expand Down Expand Up @@ -677,11 +678,12 @@ private void unregister(@Nullable WrappedRule r, RuleStatusDetail detail, @Nulla
*/
private void unregister(WrappedRule r) {
String rUID = r.getUID();
TriggerHandlerCallbackImpl callback;
synchronized (this) {
TriggerHandlerCallbackImpl callback = thCallbacks.remove(rUID);
if (callback != null) {
callback.dispose();
}
callback = thCallbacks.remove(rUID);
}
if (callback != null) {
callback.dispose();
}
removeModuleHandlers(r.getModules(), rUID);
}
Expand Down Expand Up @@ -1050,15 +1052,16 @@ private void removeMissingModuleTypes(Collection<String> moduleTypes) {
* @param td {@link TriggerData} object containing new values for {@link Trigger}'s {@link Output}s
*/
protected void runRule(String ruleUID, TriggerHandlerCallbackImpl.TriggerData td) {
if (thCallbacks.get(ruleUID) == null) {
// the rule was unregistered
return;
}
if (!started) {
logger.debug("Rule engine not yet started - not executing rule '{}'", ruleUID);
return;
}
synchronized (this) {
if (thCallbacks.get(ruleUID) == null) {
// the rule was unregistered
return;
}
if (!started) {
logger.debug("Rule engine not yet started - not executing rule '{}'", ruleUID);
return;
}

final RuleStatus ruleStatus = getRuleStatus(ruleUID);
if (ruleStatus != null && ruleStatus != RuleStatus.IDLE) {
logger.error("Failed to execute rule ‘{}' with status '{}'", ruleUID, ruleStatus.name());
Expand Down Expand Up @@ -1096,41 +1099,67 @@ protected void runRule(String ruleUID, TriggerHandlerCallbackImpl.TriggerData td
@Override
public Map<String, @Nullable Object> runNow(String ruleUID, boolean considerConditions,
@Nullable Map<String, Object> context) {
Map<String, @Nullable Object> returnContext = new HashMap<>();
final WrappedRule rule = getManagedRule(ruleUID);
if (rule == null) {
logger.warn("Failed to execute rule '{}': Invalid Rule UID", ruleUID);
return returnContext;
return Map.of();
}

TriggerHandlerCallbackImpl thCallback;
synchronized (this) {
final RuleStatus ruleStatus = getRuleStatus(ruleUID);
if (ruleStatus != null && ruleStatus != RuleStatus.IDLE) {
logger.error("Failed to execute rule ‘{}' with status '{}'", ruleUID, ruleStatus.name());
return returnContext;
thCallback = thCallbacks.get(ruleUID);
}
if (thCallback == null) {
RuleStatus ruleStatus;
synchronized (this) {
ruleStatus = getRuleStatus(ruleUID);
}
// change state to RUNNING
setStatus(ruleUID, new RuleStatusInfo(RuleStatus.RUNNING));
logger.error("Failed to execute rule '{}' with status '{}': could not acquire rule thread", ruleUID,
ruleStatus == null ? "UNKNOWN" : ruleStatus.name());
return Map.of();
}
try {
clearContext(ruleUID);
if (context != null && !context.isEmpty()) {
getContext(ruleUID, null).putAll(context);

Future<Map<String, @Nullable Object>> future = thCallback.getScheduler().submit(() -> {
Map<String, @Nullable Object> returnContext = new HashMap<>();
synchronized (this) {
final RuleStatus ruleStatus = getRuleStatus(ruleUID);
if (ruleStatus != null && ruleStatus != RuleStatus.IDLE) {
logger.error("Failed to execute rule '{}' with status '{}'", ruleUID, ruleStatus.name());
return Map.of();
}
// change state to RUNNING
setStatus(ruleUID, new RuleStatusInfo(RuleStatus.RUNNING));
}
if (!considerConditions || calculateConditions(rule)) {
executeActions(rule, false);
try {
clearContext(ruleUID);
if (context != null && !context.isEmpty()) {
getContext(ruleUID, null).putAll(context);
}
if (!considerConditions || calculateConditions(rule)) {
executeActions(rule, false);
}
logger.debug("The rule '{}' is executed.", ruleUID);
returnContext.putAll(getContext(ruleUID, null));
} catch (Throwable t) {
logger.error("Failed to execute rule '{}': ", ruleUID, t);
}
logger.debug("The rule '{}' is executed.", ruleUID);
returnContext.putAll(getContext(ruleUID, null));
} catch (Throwable t) {
logger.error("Failed to execute rule '{}': ", ruleUID, t);
}
// change state to IDLE only if the rule has not been DISABLED.
synchronized (this) {
if (getRuleStatus(ruleUID) == RuleStatus.RUNNING) {
setStatus(ruleUID, new RuleStatusInfo(RuleStatus.IDLE));
// change state to IDLE only if the rule has not been DISABLED.
synchronized (this) {
if (getRuleStatus(ruleUID) == RuleStatus.RUNNING) {
setStatus(ruleUID, new RuleStatusInfo(RuleStatus.IDLE));
}
}
}
return returnContext;
return returnContext;
});
try {
return future.get();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for rule '{}' to execute, attempting to cancel execution", ruleUID);
future.cancel(true);
} catch (ExecutionException e) {
logger.error("Failed to execute rule '{}': ", ruleUID, e);
}
return Map.of();
}

@Override
Expand Down