Skip to content

[AN-557] Include VM initialization time in cost calculation for Batch jobs #7744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### GCP Batch
* Cromwell now supports automatic use of the [GAR Dockerhub mirror](https://cloud.google.com/artifact-registry/docs/pull-cached-dockerhub-images), see [ReadTheDocs](https://cromwell.readthedocs.io/en/develop/backends/GCPBatch/) for details.
* VM initialization time in now included in estimated cost calculation for jobs.

## 89 Release Notes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,15 @@ object BatchRequestExecutor {
}

private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = {
val startedRegex = ".*SCHEDULED to RUNNING.*".r
val endedRegex = ".*RUNNING to.*".r // can be SUCCEEDED or FAILED
// on Batch, when job transitions to SCHEDULED state it indicates that the VM is being initialized. Users are billed for this
// startup time. Hence, the 'vmStartTime' corresponds to when the job enters the SCHEDULED state.
val startedRegex = ".*QUEUED to SCHEDULED.*".r
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance we might be transitioning to scheduled from a state other than QUEUED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding from conversation with Batch team was that SCHEDULED only happens after QUEUED.

It seems the only other state that can exist before QUEUED is STATE_UNSPECIFIED but I am not sure if job can go from STATE_UNSPECIFIED -> SCHEDULED 🤔
https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs#State

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want the start regex to be ".*to SCHEDULED.*" to be safe?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does seem a little safer, but I'm good either way based on your previous comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the regex just to be safe. The important part here is reaching the SCHEDULED state which would be achieved by ".*to SCHEDULED.*" as well.


// job terminal events can occur in 2 ways:
// - job transitions from a RUNNING state to either SUCCEEDED or FAILED state
// - job never enters the RUNNING state and instead transitions from SCHEDULED -> SCHEDULED_PENDING_FAILED -> FAILED
val endedRegex = ".*RUNNING to.*|.*SCHEDULED_PENDING_FAILED to FAILED.*".r

events.flatMap { e =>
val time = java.time.Instant
.ofEpochSecond(e.getEventTime.getSeconds, e.getEventTime.getNanos.toLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,32 @@ class BatchRequestExecutorSpec
with MockSugar
with PrivateMethodTester {

val startStatusEvent = StatusEvent
val schedulingStatusEvent = StatusEvent
.newBuilder()
.setType("STATUS_CHANGED")
.setEventTime(Timestamp.newBuilder().setSeconds(1).build())
.setDescription("Job state is set from SCHEDULED to RUNNING for job...")
.setDescription("Job state is set from QUEUED to SCHEDULED for job...")
.build()

val endStatusEvent = StatusEvent
val runningStatusEvent = StatusEvent
.newBuilder()
.setType("STATUS_CHANGED")
.setEventTime(Timestamp.newBuilder().setSeconds(2).build())
.setDescription("Job state is set from RUNNING to SOME_OTHER_STATUS for job...")
.setDescription("Job state is set from SCHEDULED to RUNNING for job...")
.build()

val schedulingStatusEvent = StatusEvent
val terminalStatusEvent = StatusEvent
.newBuilder()
.setType("STATUS_CHANGED")
.setEventTime(Timestamp.newBuilder().setSeconds(3).build())
.setDescription("Job state is set from QUEUED to SCHEDULED for job...")
.setDescription("Job state is set from RUNNING to SOME_OTHER_STATUS for job...")
.build()

val scheduleFailedStatusEvent = StatusEvent
.newBuilder()
.setType("STATUS_CHANGED")
.setEventTime(Timestamp.newBuilder().setSeconds(5).build())
.setDescription("Job state is set from SCHEDULED_PENDING_FAILED to FAILED for job...")
.build()

val preemptionError = "Job state is set from SCHEDULED to FAILED for job projects/....Job failed due to task " +
Expand Down Expand Up @@ -94,7 +101,7 @@ class BatchRequestExecutorSpec

behavior of "BatchRequestExecutor"

it should "create a schedule status event correctly" in {
it should "create a schedule status and vmStartTime event correctly" in {

val mockClient = setupBatchClient(jobState = JobStatus.State.QUEUED, events = List(schedulingStatusEvent))
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())
Expand All @@ -106,9 +113,9 @@ class BatchRequestExecutorSpec
// Verify the event
result.status match {
case RunStatus.Initializing(events, _) =>
events.length shouldBe 1
events.map(_.name).head shouldBe "Job state is set from QUEUED to SCHEDULED for job..."
events.map(_.offsetDateTime.toString).head shouldBe "1970-01-01T00:00:03Z"
events.length shouldBe 2
events.map(_.name) should contain allOf ("Job state is set from QUEUED to SCHEDULED for job...", "vmStartTime")
events.map(_.offsetDateTime.toString) shouldBe List("1970-01-01T00:00:01Z", "1970-01-01T00:00:01Z")
case _ => fail("Expected RunStatus.Initializing with events")
}
}
Expand Down Expand Up @@ -137,7 +144,7 @@ class BatchRequestExecutorSpec
it should "create instantiatedVmInfo correctly" in {

val mockClient =
setupBatchClient(jobState = JobStatus.State.RUNNING, events = List(startStatusEvent, endStatusEvent))
setupBatchClient(jobState = JobStatus.State.RUNNING, events = List(runningStatusEvent, terminalStatusEvent))
// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())

Expand All @@ -159,7 +166,7 @@ class BatchRequestExecutorSpec

val mockClient = setupBatchClient(location = "zones/us-central1-a",
jobState = JobStatus.State.RUNNING,
events = List(startStatusEvent, endStatusEvent)
events = List(runningStatusEvent, terminalStatusEvent)
)

// Create the BatchRequestExecutor
Expand All @@ -182,7 +189,7 @@ class BatchRequestExecutorSpec
it should "create instantiatedVmInfo correctly with missing location info" in {

val mockClient =
setupBatchClient(jobState = JobStatus.State.RUNNING, events = List(startStatusEvent, endStatusEvent))
setupBatchClient(jobState = JobStatus.State.RUNNING, events = List(runningStatusEvent, terminalStatusEvent))

// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())
Expand All @@ -203,7 +210,7 @@ class BatchRequestExecutorSpec

it should "send vmStartTime and vmEndTime metadata info when a workflow succeeds" in {

val mockClient = setupBatchClient(events = List(startStatusEvent, endStatusEvent))
val mockClient = setupBatchClient(events = List(schedulingStatusEvent, runningStatusEvent, terminalStatusEvent))

// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())
Expand All @@ -216,16 +223,22 @@ class BatchRequestExecutorSpec
result.status match {
case RunStatus.Success(events, _) =>
val eventNames = events.map(_.name)
val eventTimes = events.map(_.offsetDateTime.toString)
eventNames should contain allOf ("vmStartTime", "vmEndTime")
eventTimes should contain allOf ("1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z")

val vmStartTime = events.find(e => e.name == "vmStartTime").get
val vmEndTime = events.find(e => e.name == "vmEndTime").get

vmStartTime.offsetDateTime.toString shouldBe "1970-01-01T00:00:01Z"
vmEndTime.offsetDateTime.toString shouldBe "1970-01-01T00:00:03Z"
case _ => fail("Expected RunStatus.Success with events")
}
}

it should "send vmStartTime and vmEndTime metadata info along with other events when a workflow fails" in {
val mockClient =
setupBatchClient(jobState = JobStatus.State.FAILED, events = List(startStatusEvent, endStatusEvent))
setupBatchClient(jobState = JobStatus.State.FAILED,
events = List(schedulingStatusEvent, runningStatusEvent, terminalStatusEvent)
)

// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())
Expand All @@ -238,13 +251,44 @@ class BatchRequestExecutorSpec
result.status match {
case RunStatus.Failed(_, events, _) =>
val eventNames = events.map(_.name)
val eventTimes = events.map(_.offsetDateTime.toString)
println(eventNames)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this is a debug line that should be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Maybe this is a leftover from previous PR. The test do succeed without the println.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes, right, I got confused and though it was being added here 🤦‍♀️

eventNames should contain allOf ("vmStartTime", "vmEndTime")

val vmStartTime = events.find(e => e.name == "vmStartTime").get
val vmEndTime = events.find(e => e.name == "vmEndTime").get

eventNames should contain allOf ("Job state is set from RUNNING to SOME_OTHER_STATUS for job...", "Job state is set from SCHEDULED to RUNNING for job...")
eventTimes should contain allOf ("1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z")
case _ => fail("Expected RunStatus.Success with events")
vmStartTime.offsetDateTime.toString shouldBe "1970-01-01T00:00:01Z"
vmEndTime.offsetDateTime.toString shouldBe "1970-01-01T00:00:03Z"
case _ => fail("Expected RunStatus.Failed with events")
}
}

it should "send vmStartTime and vmEndTime metadata info along with other events when a job fails to run" in {
val mockClient =
setupBatchClient(jobState = JobStatus.State.FAILED,
events = List(schedulingStatusEvent, scheduleFailedStatusEvent)
)

// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())

// testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester
val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler"))
val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build())

// Verify the events
result.status match {
case RunStatus.Failed(_, events, _) =>
val eventNames = events.map(_.name)
eventNames should contain allOf ("vmStartTime", "vmEndTime")

val vmStartTime = events.find(e => e.name == "vmStartTime").get
val vmEndTime = events.find(e => e.name == "vmEndTime").get

eventNames should contain allOf ("Job state is set from SCHEDULED_PENDING_FAILED to FAILED for job...", "Job state is set from QUEUED to SCHEDULED for job...")
vmStartTime.offsetDateTime.toString shouldBe "1970-01-01T00:00:01Z"
vmEndTime.offsetDateTime.toString shouldBe "1970-01-01T00:00:05Z"
case _ => fail("Expected RunStatus.Failed with events")
}
}
}
Loading