-
Notifications
You must be signed in to change notification settings - Fork 9
Implement Content Updater JobScheduler #387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 6.0.0
Are you sure you want to change the base?
Changes from 41 commits
db04746
0905525
9b69a07
e0b66d9
742ac9f
e0291fb
b4f847f
9f9f13e
b5e50b2
8e2e56b
ee67948
9c4779c
5c2a358
3870f65
cf6a10e
8e5acdd
f56f321
412ebe6
0fd09ab
d1924fe
60affcd
b56b85b
af28834
6aa8b38
8cbcd82
8615b85
71efeca
41e9d91
00d5ebc
9508475
49d873f
311ea8a
a5e04a7
9a60f7a
9617a84
b9076e8
3d04131
e9b490a
3c9b656
63794f9
a60869c
b3a96f5
7068691
81930bb
9b4f925
1447817
095e57f
3fba343
01e6f0c
5d03a2d
6259c76
f4c723d
7cbfce9
3b3dc0f
a5f2134
ed59528
a283c64
39adfe6
2fa23d8
c73b9a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,39 +18,64 @@ | |
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.action.index.IndexRequest; | ||
import org.opensearch.action.index.IndexResponse; | ||
import org.opensearch.action.support.WriteRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.settings.*; | ||
import org.opensearch.common.xcontent.json.JsonXContent; | ||
import org.opensearch.core.common.io.stream.NamedWriteableRegistry; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
import org.opensearch.core.xcontent.XContentParserUtils; | ||
import org.opensearch.env.Environment; | ||
import org.opensearch.env.NodeEnvironment; | ||
import org.opensearch.jobscheduler.spi.JobSchedulerExtension; | ||
import org.opensearch.jobscheduler.spi.ScheduledJobParser; | ||
import org.opensearch.jobscheduler.spi.ScheduledJobRunner; | ||
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; | ||
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; | ||
import org.opensearch.plugins.ActionPlugin; | ||
import org.opensearch.plugins.ClusterPlugin; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.repositories.RepositoriesService; | ||
import org.opensearch.script.ScriptService; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.watcher.ResourceWatcherService; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.*; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Supplier; | ||
|
||
import com.wazuh.contentmanager.client.CTIClient; | ||
import com.wazuh.contentmanager.index.ContentIndex; | ||
import com.wazuh.contentmanager.index.ContextIndex; | ||
import com.wazuh.contentmanager.jobscheduler.ContentUpdaterJobParameter; | ||
import com.wazuh.contentmanager.jobscheduler.ContentUpdaterJobRunner; | ||
import com.wazuh.contentmanager.settings.PluginSettings; | ||
import com.wazuh.contentmanager.utils.Privileged; | ||
import com.wazuh.contentmanager.utils.SnapshotManager; | ||
|
||
/** Main class of the Content Manager Plugin */ | ||
public class ContentManagerPlugin extends Plugin implements ClusterPlugin { | ||
public class ContentManagerPlugin extends Plugin | ||
implements ClusterPlugin, ActionPlugin, JobSchedulerExtension { | ||
f-galland marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private static final Logger log = LogManager.getLogger(ContentManagerPlugin.class); | ||
|
||
/** Scheduled jobs index name. */ | ||
public static final String JOB_INDEX = ".content_updater_jobs"; | ||
|
||
/** Scheduled job ID. */ | ||
public static final String JOB_ID = "content_updater_job"; | ||
QU3B1M marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private Client client; | ||
private ContextIndex contextIndex; | ||
private ContentIndex contentIndex; | ||
private SnapshotManager snapshotManager; | ||
private ThreadPool threadPool; | ||
private ClusterService clusterService; | ||
|
||
@Override | ||
public Collection<Object> createComponents( | ||
|
@@ -65,13 +90,21 @@ public Collection<Object> createComponents( | |
NamedWriteableRegistry namedWriteableRegistry, | ||
IndexNameExpressionResolver indexNameExpressionResolver, | ||
Supplier<RepositoriesService> repositoriesServiceSupplier) { | ||
|
||
PluginSettings.getInstance(environment.settings(), clusterService); | ||
this.threadPool = threadPool; | ||
this.clusterService = clusterService; | ||
|
||
this.client = client; | ||
this.contextIndex = new ContextIndex(client); | ||
this.contentIndex = new ContentIndex(client); | ||
this.snapshotManager = | ||
new SnapshotManager(environment, this.contextIndex, this.contentIndex, new Privileged()); | ||
Privileged privileged = new Privileged(); | ||
ContentUpdaterJobRunner.getInstance( | ||
privileged.doPrivilegedRequest(CTIClient::getInstance), | ||
threadPool, | ||
environment, | ||
this.contextIndex, | ||
contentIndex, | ||
privileged); | ||
|
||
return Collections.emptyList(); | ||
} | ||
|
||
|
@@ -84,45 +117,37 @@ public Collection<Object> createComponents( | |
*/ | ||
@Override | ||
public void onNodeStarted(DiscoveryNode localNode) { | ||
// Only cluster managers are responsible for the initialization. | ||
if (localNode.isClusterManagerNode()) { | ||
if (this.clusterService.state().routingTable().hasIndex(ContentIndex.INDEX_NAME)) { | ||
this.start(); | ||
} | ||
|
||
// To be removed once we include the Job Scheduler. | ||
this.clusterService.addListener( | ||
event -> { | ||
if (event.indicesCreated().contains(ContentIndex.INDEX_NAME)) { | ||
this.start(); | ||
} | ||
}); | ||
// @TODO do we want an IS_DEV variable? | ||
if (System.getenv("IS_DEV").equals("true") && localNode.isClusterManagerNode()) { | ||
this.contextIndex.createIndex(); | ||
this.contentIndex.createIndex(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The content index There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used the |
||
} | ||
try { | ||
log.info( | ||
"Scheduled content update job with status: [{}]", scheduleContentUpdateJob().getResult()); | ||
} catch (IOException e) { | ||
log.error("Failed scheduling content update job: {}", e.getMessage()); | ||
} | ||
} | ||
|
||
private IndexResponse scheduleContentUpdateJob() throws IOException { | ||
IntervalSchedule schedule = new IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES); | ||
ContentUpdaterJobParameter jobParameter = | ||
new ContentUpdaterJobParameter("update_content", schedule); | ||
IndexRequest indexRequest = | ||
new IndexRequest() | ||
.index(JOB_INDEX) | ||
.id(JOB_ID) | ||
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
return this.client.index(indexRequest).actionGet(5, TimeUnit.SECONDS); | ||
} | ||
|
||
/** | ||
* Initialize. The initialization consists of: | ||
* Returns the settings for the plugin. | ||
* | ||
* <pre> | ||
* 1. fetching the latest consumer's information from the CTI API. | ||
* 2. initialize from a snapshot if the local consumer does not exist, or its offset is 0. | ||
* </pre> | ||
* @return the settings for the plugin | ||
*/ | ||
private void start() { | ||
try { | ||
this.threadPool | ||
.generic() | ||
.execute( | ||
() -> { | ||
this.contextIndex.createIndex(); | ||
this.snapshotManager.initialize(); | ||
}); | ||
} catch (Exception e) { | ||
// Log or handle exception | ||
log.error("Error initializing snapshot helper: {}", e.getMessage(), e); | ||
} | ||
} | ||
|
||
@Override | ||
public List<Setting<?>> getSettings() { | ||
return Arrays.asList( | ||
|
@@ -140,4 +165,64 @@ public List<Setting<?>> getSettings() { | |
PluginSettings.MAX_CONCURRENT_BULKS, | ||
PluginSettings.MAX_ITEMS_PER_BULK); | ||
} | ||
|
||
@Override | ||
public String getJobType() { | ||
return "content_updater"; | ||
} | ||
|
||
@Override | ||
public String getJobIndex() { | ||
return ".content_updater_jobs"; | ||
} | ||
|
||
@Override | ||
public ScheduledJobRunner getJobRunner() { | ||
return ContentUpdaterJobRunner.getInstance(); | ||
} | ||
|
||
@Override | ||
public ScheduledJobParser getJobParser() { | ||
return (parser, id, jobDocVersion) -> { | ||
ContentUpdaterJobParameter jobParameter = new ContentUpdaterJobParameter(); | ||
XContentParserUtils.ensureExpectedToken( | ||
XContentParser.Token.START_OBJECT, parser.nextToken(), parser); | ||
|
||
while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { | ||
String fieldName = parser.currentName(); | ||
parser.nextToken(); | ||
switch (fieldName) { | ||
case ContentUpdaterJobParameter.NAME_FIELD: | ||
jobParameter.setJobName(parser.text()); | ||
break; | ||
case ContentUpdaterJobParameter.ENABLED_FIELD: | ||
jobParameter.setEnabled(parser.booleanValue()); | ||
break; | ||
case ContentUpdaterJobParameter.ENABLED_TIME_FIELD: | ||
jobParameter.setEnabledTime(parseInstantValue(parser)); | ||
break; | ||
case ContentUpdaterJobParameter.LAST_UPDATE_TIME_FIELD: | ||
jobParameter.setLastUpdateTime(parseInstantValue(parser)); | ||
break; | ||
case ContentUpdaterJobParameter.SCHEDULE_FIELD: | ||
jobParameter.setSchedule(ScheduleParser.parse(parser)); | ||
break; | ||
default: | ||
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); | ||
} | ||
} | ||
return jobParameter; | ||
}; | ||
} | ||
|
||
private Instant parseInstantValue(XContentParser parser) throws IOException { | ||
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { | ||
return null; | ||
} | ||
if (parser.currentToken().isValue()) { | ||
return Instant.ofEpochMilli(parser.longValue()); | ||
} | ||
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); | ||
return null; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the required indices are managed by the
setup
plugin I needed a way to enable creation of the indices only on development environments.