Skip to content

Commit 0966811

Browse files
authored
Merge pull request #477 from liyinan926/master
Fixed mounting of Prometheus configuration ConfigMap
2 parents e7b9d59 + bd89732 commit 0966811

File tree

8 files changed

+135
-48
lines changed

8 files changed

+135
-48
lines changed

pkg/apis/sparkoperator.k8s.io/v1beta1/types.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,3 +495,25 @@ type PrometheusSpec struct {
495495
// Configuration has no effect if ConfigFile is set.
496496
Configuration *string `json:"configuration,omitempty"`
497497
}
498+
499+
// PrometheusMonitoringEnabled returns if Prometheus monitoring is enabled or not.
500+
func (s *SparkApplication) PrometheusMonitoringEnabled() bool {
501+
return s.Spec.Monitoring != nil && s.Spec.Monitoring.Prometheus != nil
502+
}
503+
504+
// HasPrometheusConfigFile returns if Prometheus monitoring uses a configruation file in the container.
505+
func (s *SparkApplication) HasPrometheusConfigFile() bool {
506+
return s.PrometheusMonitoringEnabled() &&
507+
s.Spec.Monitoring.Prometheus.ConfigFile != nil &&
508+
*s.Spec.Monitoring.Prometheus.ConfigFile != ""
509+
}
510+
511+
// ExposeDriverMetrics returns if driver metrics should be exposed.
512+
func (s *SparkApplication) ExposeDriverMetrics() bool {
513+
return s.Spec.Monitoring != nil && s.Spec.Monitoring.ExposeDriverMetrics
514+
}
515+
516+
// ExposeExecutorMetrics returns if executor metrics should be exposed.
517+
func (s *SparkApplication) ExposeExecutorMetrics() bool {
518+
return s.Spec.Monitoring != nil && s.Spec.Monitoring.ExposeExecutorMetrics
519+
}

pkg/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,8 @@ func GetExecutorEnvVarConfOptions(app *v1beta1.SparkApplication) []string {
5151
}
5252
return envVarConfOptions
5353
}
54+
55+
// GetPrometheusConfigMapName returns the name of the ConfigMap for Prometheus configuration.
56+
func GetPrometheusConfigMapName(app *v1beta1.SparkApplication) string {
57+
return fmt.Sprintf("%s-%s", app.Name, PrometheusConfigMapNameSuffix)
58+
}

pkg/config/constants.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,20 @@ const (
174174
HadoopDelegationTokenFileName = "hadoop.token"
175175
)
176176

177+
const (
178+
// PrometheusConfigMapNameSuffix is the name prefix of the Prometheus ConfigMap.
179+
PrometheusConfigMapNameSuffix = "prom-conf"
180+
// PrometheusConfigMapMountPath is the mount path of the Prometheus ConfigMap.
181+
PrometheusConfigMapMountPath = "/etc/metrics/conf"
182+
)
183+
184+
// DefaultMetricsProperties is the default content of metrics.properties.
177185
const DefaultMetricsProperties = `
178186
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
179187
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
180188
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource`
181189

190+
// DefaultPrometheusConfiguration is the default content of prometheus.yaml.
182191
const DefaultPrometheusConfiguration = `
183192
lowercaseOutputName: true
184193
attrNameSnakeCase: true
@@ -268,4 +277,6 @@ rules:
268277
app_id: "$2"
269278
executor_id: "$3"
270279
`
280+
281+
// DefaultPrometheusJavaAgentPort is the default port used by the Prometheus JMX exporter.
271282
const DefaultPrometheusJavaAgentPort int32 = 8090

pkg/controller/sparkapplication/controller.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -545,16 +545,14 @@ func hasRetryIntervalPassed(retryInterval *int64, attemptsDone int32, lastEventT
545545

546546
// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
547547
func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1beta1.SparkApplication {
548-
// Make a copy since configPrometheusMonitoring may update app.Spec which causes an onUpdate callback.
549-
appToSubmit := app.DeepCopy()
550-
if appToSubmit.Spec.Monitoring != nil && appToSubmit.Spec.Monitoring.Prometheus != nil {
551-
if err := configPrometheusMonitoring(appToSubmit, c.kubeClient); err != nil {
548+
if app.PrometheusMonitoringEnabled() {
549+
if err := configPrometheusMonitoring(app, c.kubeClient); err != nil {
552550
glog.Error(err)
553551
}
554552
}
555553

556554
submissionID := uuid.New().String()
557-
submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit, submissionID)
555+
submissionCmdArgs, err := buildSubmissionCommandArgs(app, submissionID)
558556
if err != nil {
559557
app.Status = v1beta1.SparkApplicationStatus{
560558
AppState: v1beta1.ApplicationState{
@@ -568,7 +566,7 @@ func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1be
568566
}
569567

570568
// Try submitting the application by running spark-submit.
571-
submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, appToSubmit))
569+
submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, app))
572570
if err != nil {
573571
app.Status = v1beta1.SparkApplicationStatus{
574572
AppState: v1beta1.ApplicationState{

pkg/controller/sparkapplication/monitoring_config.go

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package sparkapplication
1818

1919
import (
2020
"fmt"
21+
2122
"github.com/golang/glog"
2223
corev1 "k8s.io/api/core/v1"
2324
apiErrors "k8s.io/apimachinery/pkg/api/errors"
@@ -30,13 +31,11 @@ import (
3031
)
3132

3233
const (
33-
metricsPropertiesKey = "metrics.properties"
34-
prometheusConfigKey = "prometheus.yaml"
35-
prometheusConfigMapNameSuffix = "prom-conf"
36-
prometheusConfigMapMountPath = "/etc/metrics/conf"
37-
prometheusScrapeAnnotation = "prometheus.io/scrape"
38-
prometheusPortAnnotation = "prometheus.io/port"
39-
prometheusPathAnnotation = "prometheus.io/path"
34+
metricsPropertiesKey = "metrics.properties"
35+
prometheusConfigKey = "prometheus.yaml"
36+
prometheusScrapeAnnotation = "prometheus.io/scrape"
37+
prometheusPortAnnotation = "prometheus.io/port"
38+
prometheusPathAnnotation = "prometheus.io/path"
4039
)
4140

4241
func configPrometheusMonitoring(app *v1beta1.SparkApplication, kubeClient clientset.Interface) error {
@@ -45,22 +44,18 @@ func configPrometheusMonitoring(app *v1beta1.SparkApplication, kubeClient client
4544
port = *app.Spec.Monitoring.Prometheus.Port
4645
}
4746

48-
var configFile string
49-
if app.Spec.Monitoring.Prometheus.ConfigFile != nil {
50-
configFile = *app.Spec.Monitoring.Prometheus.ConfigFile
51-
}
5247
var javaOption string
53-
if configFile != "" {
48+
if app.HasPrometheusConfigFile() {
49+
configFile := *app.Spec.Monitoring.Prometheus.ConfigFile
5450
glog.V(2).Infof("Overriding the default Prometheus configuration with config file %s in the Spark image.", configFile)
5551
javaOption = fmt.Sprintf("-javaagent:%s=%d:%s", app.Spec.Monitoring.Prometheus.JmxExporterJar,
5652
port, configFile)
5753
} else {
5854
glog.V(2).Infof("Using the default Prometheus configuration.")
59-
prometheusConfigMapName := fmt.Sprintf("%s-%s", app.Name, prometheusConfigMapNameSuffix)
60-
configMap := buildPrometheusConfigMap(app, prometheusConfigMapName)
55+
configMapName := config.GetPrometheusConfigMapName(app)
56+
configMap := buildPrometheusConfigMap(app, configMapName)
6157
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
62-
cm, err := kubeClient.CoreV1().ConfigMaps(app.Namespace).Get(prometheusConfigMapName, metav1.GetOptions{})
63-
58+
cm, err := kubeClient.CoreV1().ConfigMaps(app.Namespace).Get(configMapName, metav1.GetOptions{})
6459
if apiErrors.IsNotFound(err) {
6560
_, createErr := kubeClient.CoreV1().ConfigMaps(app.Namespace).Create(configMap)
6661
return createErr
@@ -75,29 +70,20 @@ func configPrometheusMonitoring(app *v1beta1.SparkApplication, kubeClient client
7570
})
7671

7772
if retryErr != nil {
78-
return fmt.Errorf("failed to apply %s in namespace %s: %v", prometheusConfigMapName, app.Namespace, retryErr)
73+
return fmt.Errorf("failed to apply %s in namespace %s: %v", configMapName, app.Namespace, retryErr)
7974
}
8075

81-
javaOption = fmt.Sprintf("-javaagent:%s=%d:%s/%s", app.Spec.Monitoring.Prometheus.JmxExporterJar,
82-
port, prometheusConfigMapMountPath, prometheusConfigKey)
83-
84-
if app.Spec.Monitoring.ExposeDriverMetrics {
85-
app.Spec.Driver.ConfigMaps = append(app.Spec.Driver.ConfigMaps, v1beta1.NamePath{
86-
Name: prometheusConfigMapName,
87-
Path: prometheusConfigMapMountPath,
88-
})
89-
}
90-
if app.Spec.Monitoring.ExposeExecutorMetrics {
91-
app.Spec.Executor.ConfigMaps = append(app.Spec.Executor.ConfigMaps, v1beta1.NamePath{
92-
Name: prometheusConfigMapName,
93-
Path: prometheusConfigMapMountPath,
94-
})
95-
}
76+
javaOption = fmt.Sprintf(
77+
"-javaagent:%s=%d:%s/%s",
78+
app.Spec.Monitoring.Prometheus.JmxExporterJar,
79+
port,
80+
config.PrometheusConfigMapMountPath,
81+
prometheusConfigKey)
9682
}
9783

9884
/* work around for push gateway issue: https://github.com/prometheus/pushgateway/issues/97 */
9985
metricNamespace := fmt.Sprintf("%s.%s", app.Namespace, app.Name)
100-
metricConf := fmt.Sprintf("%s/%s", prometheusConfigMapMountPath, metricsPropertiesKey)
86+
metricConf := fmt.Sprintf("%s/%s", config.PrometheusConfigMapMountPath, metricsPropertiesKey)
10187
if app.Spec.SparkConf == nil {
10288
app.Spec.SparkConf = make(map[string]string)
10389
}

pkg/controller/sparkapplication/monitoring_config_test.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestConfigPrometheusMonitoring(t *testing.T) {
4444
t.Errorf("failed to configure Prometheus monitoring: %v", err)
4545
}
4646

47-
configMapName := fmt.Sprintf("%s-%s", test.app.Name, prometheusConfigMapNameSuffix)
47+
configMapName := config.GetPrometheusConfigMapName(test.app)
4848
configMap, err := fakeClient.CoreV1().ConfigMaps(test.app.Namespace).Get(configMapName, metav1.GetOptions{})
4949
if err != nil {
5050
t.Errorf("failed to get ConfigMap %s: %v", configMapName, err)
@@ -67,10 +67,6 @@ func TestConfigPrometheusMonitoring(t *testing.T) {
6767
}
6868

6969
if test.app.Spec.Monitoring.ExposeDriverMetrics {
70-
if len(test.app.Spec.Driver.ConfigMaps) != 1 {
71-
t.Errorf("expected %d driver ConfigMaps got %d", 1, len(test.app.Spec.Driver.ConfigMaps))
72-
}
73-
7470
if len(test.app.Spec.Driver.Annotations) != 3 {
7571
t.Errorf("expected %d driver annotations got %d", 3, len(test.app.Spec.Driver.Annotations))
7672
}
@@ -84,10 +80,6 @@ func TestConfigPrometheusMonitoring(t *testing.T) {
8480
}
8581

8682
if test.app.Spec.Monitoring.ExposeExecutorMetrics {
87-
if len(test.app.Spec.Executor.ConfigMaps) != 1 {
88-
t.Errorf("expected %d driver ConfigMaps got %d", 1, len(test.app.Spec.Executor.ConfigMaps))
89-
}
90-
9183
if len(test.app.Spec.Executor.Annotations) != 3 {
9284
t.Errorf("expected %d driver annotations got %d", 3, len(test.app.Spec.Executor.Annotations))
9385
}

pkg/webhook/patch.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func patchSparkPod(pod *corev1.Pod, app *v1beta1.SparkApplication) []patchOperat
5252
patchOps = append(patchOps, addGeneralConfigMaps(pod, app)...)
5353
patchOps = append(patchOps, addSparkConfigMap(pod, app)...)
5454
patchOps = append(patchOps, addHadoopConfigMap(pod, app)...)
55+
patchOps = append(patchOps, addPrometheusConfigMap(pod, app)...)
5556
patchOps = append(patchOps, addTolerations(pod, app)...)
5657

5758
if pod.Spec.SchedulerName == "" {
@@ -67,6 +68,7 @@ func patchSparkPod(pod *corev1.Pod, app *v1beta1.SparkApplication) []patchOperat
6768
patchOps = append(patchOps, *op)
6869
}
6970
}
71+
7072
if pod.Spec.SecurityContext == nil {
7173
op := addSecurityContext(pod, app)
7274
if op != nil {
@@ -219,6 +221,29 @@ func addGeneralConfigMaps(pod *corev1.Pod, app *v1beta1.SparkApplication) []patc
219221
return patchOps
220222
}
221223

224+
func addPrometheusConfigMap(pod *corev1.Pod, app *v1beta1.SparkApplication) []patchOperation {
225+
// Skip if Prometheus Monitoring is not enabled or an in-container ConfigFile is used,
226+
// in which cases a Prometheus ConfigMap won't be created.
227+
if !app.PrometheusMonitoringEnabled() || app.HasPrometheusConfigFile() {
228+
return nil
229+
}
230+
231+
if util.IsDriverPod(pod) && !app.ExposeDriverMetrics() {
232+
return nil
233+
}
234+
if util.IsExecutorPod(pod) && !app.ExposeExecutorMetrics() {
235+
return nil
236+
}
237+
238+
var patchOps []patchOperation
239+
name := config.GetPrometheusConfigMapName(app)
240+
volumeName := name + "-vol"
241+
mountPath := config.PrometheusConfigMapMountPath
242+
patchOps = append(patchOps, addConfigMapVolume(pod, name, volumeName))
243+
patchOps = append(patchOps, addConfigMapVolumeMount(pod, volumeName, mountPath))
244+
return patchOps
245+
}
246+
222247
func addConfigMapVolume(pod *corev1.Pod, configMapName string, configMapVolumeName string) patchOperation {
223248
volume := corev1.Volume{
224249
Name: configMapVolumeName,

pkg/webhook/patch_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,54 @@ func TestPatchSparkPod_HadoopConfigMap(t *testing.T) {
339339
assert.Equal(t, config.DefaultHadoopConfDir, modifiedPod.Spec.Containers[0].Env[0].Value)
340340
}
341341

342+
func TestPatchSparkPod_PrometheusConfigMaps(t *testing.T) {
343+
app := &v1beta1.SparkApplication{
344+
ObjectMeta: metav1.ObjectMeta{
345+
Name: "spark-test",
346+
UID: "spark-test-1",
347+
},
348+
Spec: v1beta1.SparkApplicationSpec{
349+
Monitoring: &v1beta1.MonitoringSpec{
350+
Prometheus: &v1beta1.PrometheusSpec{},
351+
ExposeDriverMetrics: true,
352+
},
353+
},
354+
}
355+
356+
pod := &corev1.Pod{
357+
ObjectMeta: metav1.ObjectMeta{
358+
Name: "spark-driver",
359+
Labels: map[string]string{
360+
config.SparkRoleLabel: config.SparkDriverRole,
361+
config.LaunchedBySparkOperatorLabel: "true",
362+
},
363+
},
364+
Spec: corev1.PodSpec{
365+
Containers: []corev1.Container{
366+
{
367+
Name: sparkDriverContainerName,
368+
Image: "spark-driver:latest",
369+
},
370+
},
371+
},
372+
}
373+
374+
modifiedPod, err := getModifiedPod(pod, app)
375+
if err != nil {
376+
t.Fatal(err)
377+
}
378+
379+
expectedConfigMapName := config.GetPrometheusConfigMapName(app)
380+
expectedVolumeName := expectedConfigMapName + "-vol"
381+
assert.Equal(t, 1, len(modifiedPod.Spec.Volumes))
382+
assert.Equal(t, expectedVolumeName, modifiedPod.Spec.Volumes[0].Name)
383+
assert.True(t, modifiedPod.Spec.Volumes[0].ConfigMap != nil)
384+
assert.Equal(t, expectedConfigMapName, modifiedPod.Spec.Volumes[0].ConfigMap.Name)
385+
assert.Equal(t, 1, len(modifiedPod.Spec.Containers[0].VolumeMounts))
386+
assert.Equal(t, expectedVolumeName, modifiedPod.Spec.Containers[0].VolumeMounts[0].Name)
387+
assert.Equal(t, config.PrometheusConfigMapMountPath, modifiedPod.Spec.Containers[0].VolumeMounts[0].MountPath)
388+
}
389+
342390
func TestPatchSparkPod_Tolerations(t *testing.T) {
343391
app := &v1beta1.SparkApplication{
344392
ObjectMeta: metav1.ObjectMeta{

0 commit comments

Comments
 (0)