Skip to content

Commit be03843

Browse files
authored
Add maximum pending kubernetes pod count field (#11)
* Add pod state as part of kubernetes instance * Do not allow creating kubernetes instance if the maximum pending pod limit has reached * Add maximum pod count field in plugin settings metadata * Add maximum pod count field in plugin settings view * Use maximumPendingKubernetesPodCount value from the plugin settings * Add test to verify the maximum pending pod count scenario
1 parent 6181bd7 commit be03843

12 files changed

+207
-18
lines changed

src/main/java/cd/go/contrib/elasticagent/KubernetesAgentInstances.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Date;
3030
import java.util.Map;
3131
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.Semaphore;
3233

3334
import static cd.go.contrib.elasticagent.KubernetesPlugin.LOG;
3435
import static cd.go.contrib.elasticagent.executors.GetProfileMetadataExecutor.SPECIFIED_USING_POD_CONFIGURATION;
@@ -38,6 +39,8 @@ public class KubernetesAgentInstances implements AgentInstances<KubernetesInstan
3839
private final ConcurrentHashMap<String, KubernetesInstance> instances = new ConcurrentHashMap<>();
3940
public Clock clock = Clock.DEFAULT;
4041
private boolean refreshed;
42+
final Semaphore semaphore = new Semaphore(0, true);
43+
4144
private KubernetesClientFactory factory;
4245
private KubernetesInstanceFactory kubernetesInstanceFactory;
4346

@@ -56,6 +59,26 @@ public KubernetesAgentInstances(KubernetesClientFactory factory, KubernetesInsta
5659

5760
@Override
5861
public KubernetesInstance create(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest) throws Exception {
62+
final Integer maxAllowedContainers = settings.getMaximumPendingAgentsCount();
63+
synchronized (instances) {
64+
doWithLockOnSemaphore(new SetupSemaphore(maxAllowedContainers, instances, semaphore));
65+
66+
if (semaphore.tryAcquire()) {
67+
return createKubernetesInstance(request, settings, pluginRequest);
68+
} else {
69+
LOG.warn(String.format("The number of pending kubernetes pods is currently at the maximum permissible limit (%d). Total kubernetes pods (%d). Not creating any more containers.", maxAllowedContainers, instances.size()));
70+
return null;
71+
}
72+
}
73+
}
74+
75+
private void doWithLockOnSemaphore(Runnable runnable) {
76+
synchronized (semaphore) {
77+
runnable.run();
78+
}
79+
}
80+
81+
private KubernetesInstance createKubernetesInstance(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest) throws Exception {
5982
JobIdentifier jobIdentifier = request.jobIdentifier();
6083
if (isAgentCreatedForJob(jobIdentifier.getJobId())) {
6184
LOG.warn("[Create Agent Request] Request for creating an agent for Job Identifier [" + jobIdentifier + "] has already been scheduled. Skipping current request.");

src/main/java/cd/go/contrib/elasticagent/KubernetesInstance.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ public class KubernetesInstance {
2828
private final String name;
2929
private final Map<String, String> properties;
3030
private final Long jobId;
31+
private final PodState state;
3132

32-
public KubernetesInstance(DateTime createdAt, String environment, String name, Map<String, String> properties, Long jobId) {
33+
public KubernetesInstance(DateTime createdAt, String environment, String name, Map<String, String> properties, Long jobId, PodState state) {
3334
this.createdAt = createdAt.withZone(DateTimeZone.UTC);
3435
this.environment = environment;
3536
this.name = name;
3637
this.properties = properties;
3738
this.jobId = jobId;
39+
this.state = state;
3840
}
3941

4042
public void terminate(KubernetesClient client) {
@@ -60,4 +62,8 @@ public Map<String, String> getInstanceProperties() {
6062
public Long jobId() {
6163
return jobId;
6264
}
65+
66+
public boolean isPending() {
67+
return this.state.equals(PodState.Pending);
68+
}
6369
}

src/main/java/cd/go/contrib/elasticagent/KubernetesInstanceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public KubernetesInstance fromKubernetesPod(Pod elasticAgentPod) {
120120
}
121121
String environment = metadata.getLabels().get(Constants.ENVIRONMENT_LABEL_KEY);
122122
Long jobId = Long.valueOf(metadata.getLabels().get(Constants.JOB_ID_LABEL_KEY));
123-
kubernetesInstance = new KubernetesInstance(createdAt, environment, metadata.getName(), metadata.getAnnotations(), jobId);
123+
kubernetesInstance = new KubernetesInstance(createdAt, environment, metadata.getName(), metadata.getAnnotations(), jobId, PodState.fromPod(elasticAgentPod));
124124
} catch (ParseException e) {
125125
throw new RuntimeException(e);
126126
}

src/main/java/cd/go/contrib/elasticagent/PluginSettings.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public class PluginSettings {
3131
@SerializedName("auto_register_timeout")
3232
private String autoRegisterTimeout;
3333

34+
@Expose
35+
@SerializedName("pending_pods_count")
36+
private String pendingPodsCount;
37+
3438
@Expose
3539
@SerializedName("kubernetes_cluster_url")
3640
private String kubernetesClusterUrl;
@@ -49,7 +53,6 @@ public class PluginSettings {
4953

5054
private Period autoRegisterPeriod;
5155

52-
5356
public static PluginSettings fromJSON(String json) {
5457
return GSON.fromJson(json, PluginSettings.class);
5558
}
@@ -58,12 +61,13 @@ public static PluginSettings fromJSON(String json) {
5861
public boolean equals(Object o) {
5962
if (this == o) return true;
6063
if (!(o instanceof PluginSettings)) return false;
61-
6264
PluginSettings that = (PluginSettings) o;
6365

6466
if (goServerUrl != null ? !goServerUrl.equals(that.goServerUrl) : that.goServerUrl != null) return false;
6567
if (autoRegisterTimeout != null ? !autoRegisterTimeout.equals(that.autoRegisterTimeout) : that.autoRegisterTimeout != null)
6668
return false;
69+
if (pendingPodsCount != null ? !pendingPodsCount.equals(that.pendingPodsCount) : that.pendingPodsCount != null)
70+
return false;
6771
if (kubernetesClusterUrl != null ? !kubernetesClusterUrl.equals(that.kubernetesClusterUrl) : that.kubernetesClusterUrl != null)
6872
return false;
6973
return autoRegisterPeriod != null ? autoRegisterPeriod.equals(that.autoRegisterPeriod) : that.autoRegisterPeriod == null;
@@ -73,6 +77,7 @@ public boolean equals(Object o) {
7377
public int hashCode() {
7478
int result = goServerUrl != null ? goServerUrl.hashCode() : 0;
7579
result = 31 * result + (autoRegisterTimeout != null ? autoRegisterTimeout.hashCode() : 0);
80+
result = 31 * result + (pendingPodsCount != null ? pendingPodsCount.hashCode() : 0);
7681
result = 31 * result + (kubernetesClusterUrl != null ? kubernetesClusterUrl.hashCode() : 0);
7782
result = 31 * result + (autoRegisterPeriod != null ? autoRegisterPeriod.hashCode() : 0);
7883
return result;
@@ -92,6 +97,14 @@ String getAutoRegisterTimeout() {
9297
return autoRegisterTimeout;
9398
}
9499

100+
public Integer getMaximumPendingAgentsCount() {
101+
if (pendingPodsCount == null) {
102+
pendingPodsCount = "10";
103+
}
104+
105+
return Integer.valueOf(pendingPodsCount);
106+
}
107+
95108
public String getGoServerUrl() {
96109
return goServerUrl;
97110
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2017 ThoughtWorks, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package cd.go.contrib.elasticagent;
18+
19+
import io.fabric8.kubernetes.api.model.Pod;
20+
import io.fabric8.kubernetes.api.model.PodStatus;
21+
22+
public enum PodState {
23+
Running,
24+
Pending;
25+
26+
public static PodState fromPod(Pod pod) {
27+
PodStatus status = pod.getStatus();
28+
if (status == null) {
29+
return Pending;
30+
}
31+
32+
return (status.getPhase() != null && status.getPhase().equals("Running")) ? Running : Pending;
33+
}
34+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2017 ThoughtWorks, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package cd.go.contrib.elasticagent;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.concurrent.Semaphore;
23+
24+
class SetupSemaphore implements Runnable {
25+
private final Integer maxAllowedPendingPods;
26+
private final Map<String, KubernetesInstance> instances;
27+
private final Semaphore semaphore;
28+
29+
public SetupSemaphore(Integer maxAllowedPendingPods, Map<String, KubernetesInstance> instances, Semaphore semaphore) {
30+
this.maxAllowedPendingPods = maxAllowedPendingPods;
31+
this.instances = instances;
32+
this.semaphore = semaphore;
33+
}
34+
35+
@Override
36+
public void run() {
37+
List<KubernetesInstance> pendingInstances = getPendingInstances(instances);
38+
int totalPendingPods = pendingInstances.size();
39+
int availablePermits = maxAllowedPendingPods - totalPendingPods;
40+
41+
if (availablePermits <= 0) {
42+
// no more capacity available.
43+
semaphore.drainPermits();
44+
} else {
45+
int semaphoreValueDifference = availablePermits - semaphore.availablePermits();
46+
if (semaphoreValueDifference > 0) {
47+
semaphore.release(semaphoreValueDifference);
48+
} else if (semaphoreValueDifference < 0) {
49+
semaphore.tryAcquire(Math.abs(semaphoreValueDifference));
50+
}
51+
}
52+
}
53+
54+
private List<KubernetesInstance> getPendingInstances(Map<String, KubernetesInstance> instances) {
55+
ArrayList<KubernetesInstance> pendingInstances = new ArrayList<>();
56+
for (KubernetesInstance kubernetesInstance : instances.values()) {
57+
if (kubernetesInstance.isPending()) {
58+
pendingInstances.add(kubernetesInstance);
59+
}
60+
}
61+
62+
return pendingInstances;
63+
}
64+
}

src/main/java/cd/go/contrib/elasticagent/executors/GetPluginConfigurationExecutor.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,17 @@
3232
public class GetPluginConfigurationExecutor implements RequestExecutor {
3333
public static final Field GO_SERVER_URL = new GoServerUrlField("go_server_url", "Go Server URL", false, "0");
3434
public static final Field AUTOREGISTER_TIMEOUT = new PositiveNumberField("auto_register_timeout", "Agent auto-register Timeout (in minutes)", "10", true, false, "1");
35-
public static final Field KUBERNETES_CLUSTER_URL = new SecureURLField("kubernetes_cluster_url", "Kubernetes Cluster URL", true, "2");
36-
public static final Field KUBERNETES_CLUSTER_USERNAME = new Field("kubernetes_cluster_username", "Kubernetes Cluster Username", null, false, false, "3");
37-
public static final Field KUBERNETES_CLUSTER_PASSWORD = new Field("kubernetes_cluster_password", "Kubernetes Cluster Password", null, false, true, "4");
38-
public static final Field KUBERNETES_CLUSTER_CA_CERT = new Field("kubernetes_cluster_ca_cert", "Kubernetes Cluster CA Certificate", null, false, true, "5");
35+
public static final Field MAXIMUM_PENDING_PODS_COUNT = new PositiveNumberField("pending_pods_count", "Maximum Pending Kuberneted Pods Count", "10", true, false, "2");
36+
public static final Field KUBERNETES_CLUSTER_URL = new SecureURLField("kubernetes_cluster_url", "Kubernetes Cluster URL", true, "3");
37+
public static final Field KUBERNETES_CLUSTER_USERNAME = new Field("kubernetes_cluster_username", "Kubernetes Cluster Username", null, false, false, "4");
38+
public static final Field KUBERNETES_CLUSTER_PASSWORD = new Field("kubernetes_cluster_password", "Kubernetes Cluster Password", null, false, true, "5");
39+
public static final Field KUBERNETES_CLUSTER_CA_CERT = new Field("kubernetes_cluster_ca_cert", "Kubernetes Cluster CA Certificate", null, false, true, "6");
3940
public static final Map<String, Field> FIELDS = new LinkedHashMap<>();
4041

4142
static {
4243
FIELDS.put(GO_SERVER_URL.key(), GO_SERVER_URL);
4344
FIELDS.put(AUTOREGISTER_TIMEOUT.key(), AUTOREGISTER_TIMEOUT);
45+
FIELDS.put(MAXIMUM_PENDING_PODS_COUNT.key(), MAXIMUM_PENDING_PODS_COUNT);
4446

4547
FIELDS.put(KUBERNETES_CLUSTER_URL.key(), KUBERNETES_CLUSTER_URL);
4648

src/main/resources/plugin-settings.template.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@
6868
<span class="form_error" ng-show="GOINPUTNAME[auto_register_timeout].$error.server">{{GOINPUTNAME[auto_register_timeout].$error.server}}</span>
6969
</div>
7070

71+
<div class="form_item_block">
72+
<label>Maximum Pending Kubernetes Pods Count<span class='asterix'>*</span></label>
73+
<input type="text" ng-model="pending_pods_count" ng-required="true"/>
74+
<span class="form_error" ng-show="GOINPUTNAME[pending_pods_count].$error.server">{{GOINPUTNAME[pending_pods_count].$error.server}}</span>
75+
</div>
76+
7177
<div class="form_item_block">
7278
<label>Kubernetes Cluster URL:<span class='asterix'>*</span></label>
7379
<input type="text" ng-model="kubernetes_cluster_url" ng-required="true"/>
@@ -99,4 +105,4 @@
99105
Do not provide <code> -----BEGIN * </code> and <code> -----END * </code> in your certificate data.
100106
</label>
101107
</div>
102-
</div>
108+
</div>

src/test/java/cd/go/contrib/elasticagent/KubernetesAgentInstancesTest.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import cd.go.contrib.elasticagent.model.JobIdentifier;
2020
import cd.go.contrib.elasticagent.requests.CreateAgentRequest;
2121
import io.fabric8.kubernetes.client.KubernetesClient;
22-
import io.fabric8.kubernetes.client.dsl.internal.PodOperationsImpl;
2322
import org.joda.time.DateTime;
2423
import org.junit.Before;
2524
import org.junit.Test;
@@ -58,14 +57,15 @@ public void setUp() throws Exception {
5857
initMocks(this);
5958
testProperties = new HashMap<>();
6059
when(mockCreateAgentRequest.properties()).thenReturn(testProperties);
60+
when(mockPluginSettings.getMaximumPendingAgentsCount()).thenReturn(10);
6161
when(factory.kubernetes(mockPluginSettings)).thenReturn(mockKubernetesClient);
6262
JobIdentifier jobId = new JobIdentifier("test", 1L, "Test pipeline", "test name", "1", "test job", 100L);
6363
when(mockCreateAgentRequest.jobIdentifier()).thenReturn(jobId);
6464
}
6565

6666
@Test
6767
public void shouldCreateKubernetesPodUsingPodYamlAndCacheCreatedInstance() throws Exception {
68-
KubernetesInstance kubernetesInstance = new KubernetesInstance(new DateTime(), "test", "test-agent", new HashMap<>(), 100L);
68+
KubernetesInstance kubernetesInstance = new KubernetesInstance(new DateTime(), "test", "test-agent", new HashMap<>(), 100L, PodState.Running);
6969
when(mockKubernetesInstanceFactory.create(mockCreateAgentRequest, mockPluginSettings, mockKubernetesClient, mockPluginRequest, true)).
7070
thenReturn(kubernetesInstance);
7171

@@ -78,7 +78,7 @@ public void shouldCreateKubernetesPodUsingPodYamlAndCacheCreatedInstance() throw
7878

7979
@Test
8080
public void shouldCreateKubernetesPodAndCacheCreatedInstance() throws Exception {
81-
KubernetesInstance kubernetesInstance = new KubernetesInstance(new DateTime(), "test", "test-agent", new HashMap<>(), 100L);
81+
KubernetesInstance kubernetesInstance = new KubernetesInstance(new DateTime(), "test", "test-agent", new HashMap<>(), 100L, PodState.Running);
8282
when(mockKubernetesInstanceFactory.create(mockCreateAgentRequest, mockPluginSettings, mockKubernetesClient, mockPluginRequest, false)).
8383
thenReturn(kubernetesInstance);
8484
testProperties.put("SpecifiedUsingPodConfiguration", "false");
@@ -89,7 +89,7 @@ public void shouldCreateKubernetesPodAndCacheCreatedInstance() throws Exception
8989

9090
@Test
9191
public void shouldNotCreatePodWhenOutstandingRequestsExistForJobs() throws Exception {
92-
KubernetesInstance kubernetesInstance = new KubernetesInstance(new DateTime(), "test", "test-agent", new HashMap<>(), 100L);
92+
KubernetesInstance kubernetesInstance = new KubernetesInstance(new DateTime(), "test", "test-agent", new HashMap<>(), 100L, PodState.Running);
9393
when(mockKubernetesInstanceFactory.create(mockCreateAgentRequest, mockPluginSettings, mockKubernetesClient, mockPluginRequest, false)).
9494
thenReturn(kubernetesInstance);
9595
testProperties.put("SpecifiedUsingPodConfiguration", "false");
@@ -104,4 +104,28 @@ public void shouldNotCreatePodWhenOutstandingRequestsExistForJobs() throws Excep
104104
agentInstances.create(mockCreateAgentRequest, mockPluginSettings, mockPluginRequest);
105105
verify(mockKubernetesInstanceFactory, times(0)).create(any(), any(), any(), any(), any());
106106
}
107-
}
107+
108+
@Test
109+
public void shouldNotCreatePodsWhenOutstandingLimitOfPendingKubernetesPodsHasReached() throws Exception {
110+
//set maximum pending pod count to 1
111+
when(mockPluginSettings.getMaximumPendingAgentsCount()).thenReturn(1);
112+
113+
//pending kubernetes pod
114+
KubernetesInstance kubernetesInstance = new KubernetesInstance(new DateTime(), "test", "test-agent", new HashMap<>(), 100L, PodState.Pending);
115+
when(mockKubernetesInstanceFactory.create(mockCreateAgentRequest, mockPluginSettings, mockKubernetesClient, mockPluginRequest, false)).
116+
thenReturn(kubernetesInstance);
117+
testProperties.put("SpecifiedUsingPodConfiguration", "false");
118+
119+
//first create agent request
120+
KubernetesAgentInstances agentInstances = new KubernetesAgentInstances(factory, mockKubernetesInstanceFactory);
121+
JobIdentifier jobId = new JobIdentifier("test", 1L, "Test pipeline", "test name", "1", "test job", 100L);
122+
when(mockCreateAgentRequest.jobIdentifier()).thenReturn(jobId);
123+
agentInstances.create(mockCreateAgentRequest, mockPluginSettings, mockPluginRequest);
124+
verify(mockKubernetesInstanceFactory, times(1)).create(any(), any(), any(), any(), any());
125+
reset(mockKubernetesInstanceFactory);
126+
127+
//second create agent request
128+
agentInstances.create(mockCreateAgentRequest, mockPluginSettings, mockPluginRequest);
129+
verify(mockKubernetesInstanceFactory, times(0)).create(any(), any(), any(), any(), any());
130+
}
131+
}

src/test/java/cd/go/contrib/elasticagent/PluginSettingsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ public void shouldDeserializeFromJSON() throws Exception {
2727
PluginSettings pluginSettings = PluginSettings.fromJSON("{" +
2828
"\"go_server_url\": \"https://foo.go.cd/go\", " +
2929
"\"auto_register_timeout\": \"10\", " +
30+
"\"pending_pods_count\": \"10\", " +
3031
"\"kubernetes_cluster_url\": \"https://cloud.example.com\" " +
3132
"}");
3233

3334
assertThat(pluginSettings.getGoServerUrl(), is("https://foo.go.cd/go"));
3435
assertThat(pluginSettings.getAutoRegisterTimeout(), is("10"));
36+
assertThat(pluginSettings.getMaximumPendingAgentsCount(), is(10));
3537
assertThat(pluginSettings.getKubernetesClusterUrl(), is("https://cloud.example.com"));
3638
}
3739
}

0 commit comments

Comments
 (0)