Skip to content

Commit 1886642

Browse files
authored
Include additional keys in workflow query response (#2971)
* Add additionalKeys param to query workflows Add an additionalKeys parameter that allows users to specify additional keys that should be included in the query result json for each workflow. * Include additionalKeys in query response h/t to @kshakir and @ruchim for restructuring the code to handle futures. * Query for labels in additionalKeys * Test changes to query endpoint Add tests to check that the key specified in the additionalKeys parameter for the query endpoint is returned in the workflow query metadata. * Fix function ordering * Validate additionalKeys value Throw a validation error if an unexpected key is included in the additionalKeys parameter for the query endpoint. Currently only "labels" and "parentWorkflowId" are supported. * Rename additionalKeys to additionalQueryResultFields * Code refactoring/clean-up * Fix renaming additionalKeys parameter
1 parent 86c7ce3 commit 1886642

File tree

7 files changed

+156
-16
lines changed

7 files changed

+156
-16
lines changed

engine/src/main/resources/swagger/cromwell.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,15 @@ paths:
356356
returns workflows with all of the specified label keys. Specify the label key
357357
and label value pair as separated with
358358
"label-key:label-value"
359+
- name: additionalQueryResultFields
360+
required: false
361+
in: query
362+
type: array
363+
items:
364+
type: string
365+
collectionFormat: multi
366+
description: >
367+
Includes the specified keys in the metadata for the returned workflows.
359368
tags:
360369
- Workflows
361370
responses:

engine/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,6 @@ object WorkflowJsonSupport extends DefaultJsonProtocol {
5050
}
5151
}
5252

53-
implicit val workflowQueryResult = jsonFormat5(WorkflowQueryResult)
53+
implicit val workflowQueryResult = jsonFormat7(WorkflowQueryResult)
5454
implicit val workflowQueryResponse = jsonFormat1(WorkflowQueryResponse)
5555
}

engine/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import cromwell.services.healthmonitor.HealthMonitorServiceActor.{GetCurrentStat
2020
import cromwell.services.metadata.MetadataService._
2121
import cromwell.services.metadata._
2222
import cromwell.util.SampleWdl.HelloWorld
23+
import mouse.boolean._
2324
import org.scalatest.{AsyncFlatSpec, Matchers}
2425
import spray.json.DefaultJsonProtocol._
2526
import spray.json._
@@ -488,6 +489,33 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
488489
}
489490
}
490491

492+
it should "return labels if specified in additionalQueryResultFields param" in {
493+
Get(s"/workflows/$version/query?additionalQueryResultFields=labels&id=${CromwellApiServiceSpec.ExistingWorkflowId}") ~>
494+
akkaHttpService.workflowRoutes ~>
495+
check {
496+
status should be(StatusCodes.OK)
497+
contentType should be(ContentTypes.`application/json`)
498+
val results = responseAs[JsObject].fields("results").convertTo[Seq[JsObject]]
499+
val fields = results.head.fields
500+
fields("id") should be(JsString(CromwellApiServiceSpec.ExistingWorkflowId.toString))
501+
fields(WorkflowMetadataKeys.Labels).asJsObject.fields("key1") should be(JsString("label1"))
502+
fields(WorkflowMetadataKeys.Labels).asJsObject.fields("key2") should be(JsString("label2"))
503+
}
504+
}
505+
506+
it should "return parentWorkflowId if specified in additionalQueryResultFields param" in {
507+
Get(s"/workflows/$version/query?additionalQueryResultFields=parentWorkflowId&id=${CromwellApiServiceSpec.ExistingWorkflowId}") ~>
508+
akkaHttpService.workflowRoutes ~>
509+
check {
510+
status should be(StatusCodes.OK)
511+
contentType should be(ContentTypes.`application/json`)
512+
val results = responseAs[JsObject].fields("results").convertTo[Seq[JsObject]]
513+
val fields = results.head.fields
514+
fields("id") should be(JsString(CromwellApiServiceSpec.ExistingWorkflowId.toString))
515+
fields(WorkflowMetadataKeys.ParentWorkflowId) should be(JsString("pid"))
516+
}
517+
}
518+
491519
behavior of "REST API /query POST endpoint"
492520
it should "return good results for a good query map body" in {
493521
Post(s"/workflows/$version/query", HttpEntity(ContentTypes.`application/json`, """[{"status":"Succeeded"}]""")) ~>
@@ -502,6 +530,33 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
502530
}
503531
}
504532

533+
it should "return labels if specified in additionalQueryResultFields param" in {
534+
Post(s"/workflows/$version/query", HttpEntity(ContentTypes.`application/json`, """[{"additionalQueryResultFields":"labels"}]""")) ~>
535+
akkaHttpService.workflowRoutes ~>
536+
check {
537+
assertResult(StatusCodes.OK) {
538+
status
539+
}
540+
val results = responseAs[JsObject].fields("results").convertTo[Seq[JsObject]]
541+
val fields = results.head.fields
542+
fields(WorkflowMetadataKeys.Labels).asJsObject.fields("key1") should be(JsString("label1"))
543+
fields(WorkflowMetadataKeys.Labels).asJsObject.fields("key2") should be(JsString("label2"))
544+
}
545+
}
546+
547+
it should "return parentWorkflowId if specified in additionalQueryResultFields param" in {
548+
Post(s"/workflows/$version/query", HttpEntity(ContentTypes.`application/json`, """[{"additionalQueryResultFields":"parentWorkflowId"}]""")) ~>
549+
akkaHttpService.workflowRoutes ~>
550+
check {
551+
assertResult(StatusCodes.OK) {
552+
status
553+
}
554+
assertResult(true) {
555+
entityAs[String].contains("\"parentWorkflowId\":\"pid\"")
556+
}
557+
}
558+
}
559+
505560
behavior of "REST API /labels GET endpoint"
506561
it should "return labels for a workflow ID" in {
507562
Get(s"/workflows/$version/${CromwellApiServiceSpec.ExistingWorkflowId}/labels") ~>
@@ -588,9 +643,16 @@ object CromwellApiServiceSpec {
588643
class MockServiceRegistryActor extends Actor {
589644
import MockServiceRegistryActor._
590645
override def receive = {
591-
case WorkflowQuery(_) =>
646+
case WorkflowQuery(parameters) =>
647+
val labels: Option[Map[String, String]] = {
648+
parameters.contains(("additionalQueryResultFields", "labels")).option(
649+
Map("key1" -> "label1", "key2" -> "label2"))
650+
}
651+
val parentWorkflowId: Option[String] = {
652+
parameters.contains(("additionalQueryResultFields", "parentWorkflowId")).option("pid")
653+
}
592654
val response = WorkflowQuerySuccess(WorkflowQueryResponse(List(WorkflowQueryResult(ExistingWorkflowId.toString,
593-
None, Some(WorkflowSucceeded.toString), None, None))), None)
655+
None, Some(WorkflowSucceeded.toString), None, None, labels, parentWorkflowId))), None)
594656
sender ! response
595657
case ValidateWorkflowId(id) =>
596658
if (RecognizedWorkflowIds.contains(id)) sender ! MetadataService.RecognizedWorkflowId

services/src/main/scala/cromwell/services/metadata/MetadataService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ object MetadataService {
1717

1818
final val MetadataServiceName = "MetadataService"
1919

20-
final case class WorkflowQueryResult(id: String, name: Option[String], status: Option[String], start: Option[OffsetDateTime], end: Option[OffsetDateTime])
20+
final case class WorkflowQueryResult(id: String, name: Option[String], status: Option[String], start: Option[OffsetDateTime], end: Option[OffsetDateTime], labels: Option[Map[String, String]], parentWorkflowId: Option[String])
2121

2222
final case class WorkflowQueryResponse(results: Seq[WorkflowQueryResult])
2323

services/src/main/scala/cromwell/services/metadata/WorkflowQueryKey.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ import cats.data
66
import cats.syntax.traverse._
77
import cats.syntax.validated._
88
import cromwell.core.labels.Label
9-
import cromwell.core.{WorkflowId, WorkflowState}
9+
import cromwell.core.{WorkflowId, WorkflowMetadataKeys, WorkflowState}
1010
import common.validation.ErrorOr._
1111
import cats.data.Validated._
1212
import cats.instances.list._
13+
import mouse.boolean._
1314

1415
import scala.util.{Success, Try}
1516

1617
object WorkflowQueryKey {
17-
val ValidKeys = Set(StartDate, EndDate, Name, Id, Status, LabelKeyValue, Page, PageSize) map { _.name }
18+
val ValidKeys = Set(StartDate, EndDate, Name, Id, Status, LabelKeyValue, Page, PageSize, AdditionalQueryResultFields) map { _.name }
1819

1920
case object StartDate extends DateTimeWorkflowQueryKey {
2021
override val name = "Start"
@@ -89,6 +90,19 @@ object WorkflowQueryKey {
8990
sequenceListOfValidatedNels("Unrecognized status values", nels)
9091
}
9192
}
93+
94+
case object AdditionalQueryResultFields extends SeqWorkflowQueryKey[String] {
95+
override val name = "Additionalqueryresultfields"
96+
97+
override def validate(grouped: Map[String, Seq[(String, String)]]): ErrorOr[List[String]] = {
98+
val values = valuesFromMap(grouped).toList
99+
val allowedValues = Seq(WorkflowMetadataKeys.Labels, WorkflowMetadataKeys.ParentWorkflowId)
100+
val nels: List[ErrorOr[String]] = values map { v => {
101+
allowedValues.contains(v).fold(v.validNel[String], v.invalidNel[String])
102+
}}
103+
sequenceListOfValidatedNels("Unrecognized values", nels)
104+
}
105+
}
92106
}
93107

94108
sealed trait WorkflowQueryKey[T] {

services/src/main/scala/cromwell/services/metadata/WorkflowQueryParameters.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ case class WorkflowQueryParameters private(statuses: Set[String],
1717
startDate: Option[OffsetDateTime],
1818
endDate: Option[OffsetDateTime],
1919
page: Option[Int],
20-
pageSize: Option[Int])
20+
pageSize: Option[Int],
21+
additionalQueryResultFields: Set[String])
2122

2223
object WorkflowQueryParameters {
2324

@@ -68,6 +69,7 @@ object WorkflowQueryParameters {
6869
val labelsValidation: ErrorOr[Set[Label]] = WorkflowQueryKey.LabelKeyValue.validate(valuesByCanonicalCapitalization).map(_.toSet)
6970
val pageValidation = Page.validate(valuesByCanonicalCapitalization)
7071
val pageSizeValidation = PageSize.validate(valuesByCanonicalCapitalization)
72+
val additionalQueryResultFieldsValidation: ErrorOr[Set[String]] = AdditionalQueryResultFields.validate(valuesByCanonicalCapitalization).map(_.toSet)
7173

7274
// Only validate start before end if both of the individual date parsing validations have already succeeded.
7375
val startBeforeEndValidation: ErrorOr[Unit] = (startDateValidation, endDateValidation) match {
@@ -84,7 +86,8 @@ object WorkflowQueryParameters {
8486
startDateValidation,
8587
endDateValidation,
8688
pageValidation,
87-
pageSizeValidation) mapN { (_, _, statuses, names, ids, labels, startDate, endDate, page, pageSize) => WorkflowQueryParameters(statuses, names, ids, labels, startDate, endDate, page, pageSize) }
89+
pageSizeValidation,
90+
additionalQueryResultFieldsValidation) mapN { (_, _, statuses, names, ids, labels, startDate, endDate, page, pageSize, additionalQueryResultFields) => WorkflowQueryParameters(statuses, names, ids, labels, startDate, endDate, page, pageSize, additionalQueryResultFields) }
8891
}
8992

9093
def apply(rawParameters: Seq[(String, String)]): WorkflowQueryParameters = {

services/src/main/scala/cromwell/services/metadata/impl/MetadataDatabaseAccess.scala

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@ package cromwell.services.metadata.impl
22

33
import cats.Semigroup
44
import cats.data.NonEmptyList
5+
import cats.instances.future._
6+
import cats.instances.list._
7+
import cats.instances.option._
8+
import cats.syntax.functor._
59
import cats.syntax.semigroup._
10+
import cats.syntax.traverse._
611
import cromwell.core.{WorkflowId, WorkflowMetadataKeys, WorkflowState}
712
import cromwell.database.sql.SqlConverters._
813
import cromwell.database.sql.joins.{CallOrWorkflowQuery, CallQuery, WorkflowQuery}
914
import cromwell.database.sql.tables.{MetadataEntry, WorkflowMetadataSummaryEntry}
1015
import cromwell.services.MetadataServicesStore
1116
import cromwell.services.metadata.MetadataService.{QueryMetadata, WorkflowQueryResponse}
1217
import cromwell.services.metadata._
18+
import mouse.boolean._
1319

1420
import scala.concurrent.{ExecutionContext, Future}
1521

@@ -35,7 +41,7 @@ object MetadataDatabaseAccess {
3541
}
3642
}
3743

38-
def baseSummary(workflowUuid: String) = WorkflowMetadataSummaryEntry(workflowUuid, None, None, None, None, None)
44+
def baseSummary(workflowUuid: String) = WorkflowMetadataSummaryEntry(workflowUuid, None, None, None, None)
3945

4046
// If visibility is made `private`, there's a bogus warning about this being unused.
4147
implicit class MetadatumEnhancer(val metadatum: MetadataEntry) extends AnyVal {
@@ -178,14 +184,60 @@ trait MetadataDatabaseAccess {
178184
queryParameters.statuses, queryParameters.names, queryParameters.ids.map(_.toString), queryParameters.labels.map(label => (label.key, label.value)),
179185
queryParameters.startDate.map(_.toSystemTimestamp), queryParameters.endDate.map(_.toSystemTimestamp))
180186

181-
workflowSummaryCount flatMap { count =>
182-
workflowSummaries map { workflows =>
183-
(WorkflowQueryResponse(workflows.toSeq map { workflow =>
184-
MetadataService.WorkflowQueryResult(id = workflow.workflowExecutionUuid, name = workflow.workflowName, status = workflow.workflowStatus, start = workflow.startTimestamp map { _.toSystemOffsetDateTime }, end = workflow.endTimestamp map { _.toSystemOffsetDateTime })
185-
}),
186-
//only return metadata if page is defined
187-
queryParameters.page map { _ => QueryMetadata(queryParameters.page, queryParameters.pageSize, Option(count)) })
187+
def queryMetadata(count: Int): Option[QueryMetadata] = {
188+
queryParameters.page.as(QueryMetadata(queryParameters.page, queryParameters.pageSize, Option(count)))
189+
}
190+
191+
def summariesToQueryResults(workflows: Traversable[WorkflowMetadataSummaryEntry]): Future[List[MetadataService.WorkflowQueryResult]] = {
192+
workflows.toList.traverse(summaryToQueryResult)
193+
}
194+
195+
def summaryToQueryResult(workflow: WorkflowMetadataSummaryEntry): Future[MetadataService.WorkflowQueryResult] = {
196+
197+
def queryResult(labels: Map[String, String], parentId: Option[String]): MetadataService.WorkflowQueryResult = {
198+
MetadataService.WorkflowQueryResult(
199+
id = workflow.workflowExecutionUuid,
200+
name = workflow.workflowName,
201+
status = workflow.workflowStatus,
202+
start = workflow.startTimestamp map { _.toSystemOffsetDateTime },
203+
end = workflow.endTimestamp map { _.toSystemOffsetDateTime },
204+
labels = labels.nonEmpty.option(labels),
205+
parentWorkflowId = parentId
206+
)
188207
}
208+
209+
def metadataEntriesToValue(entries: Seq[MetadataEntry]): Option[String] = {
210+
entries.headOption.flatMap(_.metadataValue.toRawStringOption)
211+
}
212+
213+
def keyToMetadataValue(key: String): Future[Option[String]] = {
214+
val metadataEntries: Future[Seq[MetadataEntry]] = metadataDatabaseInterface.queryMetadataEntries(workflow.workflowExecutionUuid, key)
215+
metadataEntries map metadataEntriesToValue
216+
}
217+
218+
def getWorkflowLabels: Future[Map[String, String]] = {
219+
queryParameters.additionalQueryResultFields.contains(WorkflowMetadataKeys.Labels).fold(
220+
metadataDatabaseInterface.getWorkflowLabels(workflow.workflowExecutionUuid), Future.successful(Map.empty))
221+
}
222+
223+
val getParentWorkflowId: Future[Option[String]] = {
224+
queryParameters.additionalQueryResultFields.contains(WorkflowMetadataKeys.ParentWorkflowId).fold(
225+
keyToMetadataValue(WorkflowMetadataKeys.ParentWorkflowId), Future.successful(None))
226+
}
227+
228+
for {
229+
labels <- getWorkflowLabels
230+
parentWorkflowId <- getParentWorkflowId
231+
} yield queryResult(labels, parentWorkflowId)
232+
189233
}
234+
235+
for {
236+
count <- workflowSummaryCount
237+
workflows <- workflowSummaries
238+
queryResults <- summariesToQueryResults(workflows)
239+
} yield (WorkflowQueryResponse(queryResults.toSeq), queryMetadata(count))
240+
190241
}
191242
}
243+

0 commit comments

Comments
 (0)