Skip to content

Commit 93050be

Browse files
authored
AN-522 Temporarily log messages received by PAPI job execution actors (#7732)
1 parent 4552d10 commit 93050be

File tree

5 files changed

+49
-23
lines changed

5 files changed

+49
-23
lines changed

backend/src/main/scala/cromwell/backend/async/AsyncBackendJobExecutionActor.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package cromwell.backend.async
22

3-
import java.util.concurrent.ExecutionException
4-
5-
import akka.actor.{Actor, ActorLogging, ActorRef}
6-
import cromwell.backend.{BackendJobDescriptor, SlowJobWarning}
3+
import akka.actor.{Actor, ActorRef}
74
import cromwell.backend.BackendJobExecutionActor._
85
import cromwell.backend.async.AsyncBackendJobExecutionActor._
6+
import cromwell.backend.{BackendJobDescriptor, SlowJobWarning}
97
import cromwell.core.CromwellFatalExceptionMarker
8+
import cromwell.core.logging.JobLogging
109
import cromwell.core.retry.{Retry, SimpleExponentialBackoff}
1110
import cromwell.services.metadata.MetadataService.MetadataServiceResponse
1211

12+
import java.util.concurrent.ExecutionException
1313
import scala.concurrent.duration._
1414
import scala.concurrent.{ExecutionContext, Future, Promise}
1515
import scala.util.{Failure, Success}
@@ -41,7 +41,7 @@ object AsyncBackendJobExecutionActor {
4141
}
4242
}
4343

44-
trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging with SlowJobWarning =>
44+
trait AsyncBackendJobExecutionActor { this: Actor with JobLogging with SlowJobWarning =>
4545

4646
def dockerImageUsed: Option[String]
4747

@@ -84,14 +84,20 @@ trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging with SlowJob
8484

8585
override def receive: Receive = slowJobWarningReceive orElse {
8686
case mode: ExecutionMode => robustExecuteOrRecover(mode)
87-
case IssuePollRequest(handle) => robustPoll(handle)
88-
case PollResponseReceived(handle) if handle.isDone => self ! Finish(handle)
87+
case IssuePollRequest(handle) =>
88+
jobLogger.info("AN-522 Received IssuePollRequest")
89+
robustPoll(handle)
90+
case PollResponseReceived(handle) if handle.isDone =>
91+
jobLogger.info("AN-522 Received PollResponseReceived with isDone=true")
92+
self ! Finish(handle)
8993
case PollResponseReceived(handle) =>
9094
// This should stash the Cancellable someplace so it can be cancelled once polling is complete.
9195
// -Ywarn-value-discard
96+
jobLogger.info("AN-522 Received PollResponseReceived with isDone=false")
9297
context.system.scheduler.scheduleOnce(pollBackOff.backoffMillis.millis, self, IssuePollRequest(handle))
9398
()
9499
case Finish(SuccessfulExecutionHandle(outputs, returnCode, jobDetritusFiles, executionEvents, _)) =>
100+
jobLogger.info("AN-522 Received Finish with SuccessfulExecutionHandle")
95101
completionPromise.success(
96102
JobSucceededResponse(jobDescriptor.key,
97103
Some(returnCode),
@@ -104,16 +110,23 @@ trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging with SlowJob
104110
)
105111
context.stop(self)
106112
case Finish(FailedNonRetryableExecutionHandle(throwable, returnCode, _)) =>
113+
jobLogger.info("AN-522 Received Finish with FailedNonRetryableExecutionHandle")
107114
completionPromise.success(JobFailedNonRetryableResponse(jobDescriptor.key, throwable, returnCode))
108115
context.stop(self)
109116
case Finish(FailedRetryableExecutionHandle(throwable, returnCode, _)) =>
117+
jobLogger.info("AN-522 Received Finish with FailedRetryableExecutionHandle")
110118
completionPromise.success(JobFailedRetryableResponse(jobDescriptor.key, throwable, returnCode))
111119
context.stop(self)
112120
case Finish(cromwell.backend.async.AbortedExecutionHandle) =>
121+
jobLogger.info("AN-522 Received Finish with AbortedExecutionHandle")
113122
completionPromise.success(JobAbortedResponse(jobDescriptor.key))
114123
context.stop(self)
115-
case FailAndStop(t) => failAndStop(t)
116-
case response: MetadataServiceResponse => handleMetadataServiceResponse(sender(), response)
124+
case FailAndStop(t) =>
125+
jobLogger.info("AN-522 Received FailAndStop")
126+
failAndStop(t)
127+
case response: MetadataServiceResponse =>
128+
jobLogger.info("AN-522 Received MetadataServiceResponse")
129+
handleMetadataServiceResponse(sender(), response)
117130
case badMessage => log.error(s"Unexpected message $badMessage.")
118131
}
119132

services/src/main/scala/cromwell/services/keyvalue/KvClient.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package cromwell.services.keyvalue
22

3-
import akka.actor.{Actor, ActorLogging, ActorRef}
3+
import akka.actor.{Actor, ActorRef}
4+
import cromwell.core.logging.JobLogging
45
import cromwell.services.keyvalue.KeyValueServiceActor._
56

67
import scala.concurrent.{ExecutionContext, Future, Promise}
78

8-
trait KvClient { this: Actor with ActorLogging =>
9+
trait KvClient { this: Actor with JobLogging =>
910

1011
def serviceRegistryActor: ActorRef
1112
private[keyvalue] var currentKvClientRequests: Map[ScopedKey, Promise[KvResponse]] = Map.empty
@@ -21,6 +22,7 @@ trait KvClient { this: Actor with ActorLogging =>
2122
}
2223

2324
final def kvClientReceive: Actor.Receive = { case response: KvResponse =>
25+
jobLogger.info("AN-522 KvResponse")
2426
fulfillOrLog(response)
2527
}
2628

services/src/test/scala/cromwell/services/keyvalue/KvClientSpec.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
package cromwell.services.keyvalue
22

3-
import java.io.IOException
4-
5-
import akka.actor.{Actor, ActorLogging, ActorRef}
3+
import akka.actor.{Actor, ActorRef}
64
import akka.testkit.{TestActorRef, TestProbe}
7-
import cromwell.core.TestKitSuite
5+
import cromwell.core.logging.JobLogging
6+
import cromwell.core.{PossiblyNotRootWorkflowId, RootWorkflowId, TestKitSuite}
87
import cromwell.services.keyvalue.KeyValueServiceActor._
98
import org.scalatest.flatspec.AnyFlatSpecLike
109
import org.scalatest.matchers.should.Matchers
1110

12-
import scala.concurrent.{Await, ExecutionContextExecutor}
11+
import java.io.IOException
12+
import java.util.UUID
1313
import scala.concurrent.duration._
14+
import scala.concurrent.{Await, ExecutionContextExecutor}
1415
import scala.language.postfixOps
1516

1617
class KvClientSpec extends TestKitSuite with AnyFlatSpecLike with Matchers {
@@ -55,6 +56,10 @@ class KvClientSpec extends TestKitSuite with AnyFlatSpecLike with Matchers {
5556
}
5657
}
5758

58-
class KvTestClientActor(val serviceRegistryActor: ActorRef) extends Actor with ActorLogging with KvClient {
59+
class KvTestClientActor(val serviceRegistryActor: ActorRef) extends Actor with JobLogging with KvClient {
60+
def workflowIdForLogging: PossiblyNotRootWorkflowId = PossiblyNotRootWorkflowId(UUID.randomUUID())
61+
def rootWorkflowIdForLogging: RootWorkflowId = RootWorkflowId(UUID.randomUUID())
62+
def jobTag: String = "foobar"
63+
5964
override def receive: Receive = kvClientReceive orElse Actor.ignoringBehavior
6065
}

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/api/clients/PipelinesApiRunCreationClient.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package cromwell.backend.google.pipelines.common.api.clients
22

3-
import akka.actor.{Actor, ActorLogging, ActorRef}
3+
import akka.actor.{Actor, ActorRef}
44
import cromwell.backend.google.pipelines.common.PapiInstrumentation
55
import cromwell.backend.google.pipelines.common.api.PipelinesApiRequestFactory.CreatePipelineParameters
66
import cromwell.backend.google.pipelines.common.api.PipelinesApiRequestManager.{
@@ -10,7 +10,7 @@ import cromwell.backend.google.pipelines.common.api.PipelinesApiRequestManager.{
1010
import cromwell.backend.google.pipelines.common.api.{PipelinesApiRequestFactory, PipelinesApiRequestManager}
1111
import cromwell.backend.standard.StandardAsyncJob
1212
import cromwell.core.WorkflowId
13-
import cromwell.core.logging.JobLogger
13+
import cromwell.core.logging.{JobLogger, JobLogging}
1414

1515
import scala.concurrent.{Future, Promise}
1616
import scala.util.{Failure, Success, Try}
@@ -33,17 +33,20 @@ object PipelinesApiRunCreationClient {
3333
* Be sure to make the main class's receive look like:
3434
* override def receive = runCreationClientReceive orElse { ... }
3535
*/
36-
trait PipelinesApiRunCreationClient { this: Actor with ActorLogging with PapiInstrumentation =>
36+
trait PipelinesApiRunCreationClient { this: Actor with JobLogging with PapiInstrumentation =>
3737
private var runCreationClientPromise: Option[Promise[StandardAsyncJob]] = None
3838

3939
val papiApiActor: ActorRef
4040
val requestFactory: PipelinesApiRequestFactory
4141

4242
def runCreationClientReceive: Actor.Receive = {
4343
case job: StandardAsyncJob =>
44+
jobLogger.info("AN-522 RunCreation success, received StandardAsyncJob")
4445
runSuccess()
4546
completePromise(Success(job))
46-
case PipelinesApiRunCreationQueryFailed(_, e) => completePromise(Failure(e))
47+
case PipelinesApiRunCreationQueryFailed(_, e) =>
48+
jobLogger.info("AN-522 PipelinesApiRunCreationQueryFailed")
49+
completePromise(Failure(e))
4750
}
4851

4952
private def completePromise(job: Try[StandardAsyncJob]) = {

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/api/clients/PipelinesApiStatusRequestClient.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package cromwell.backend.google.pipelines.common.api.clients
22

3-
import akka.actor.{Actor, ActorLogging, ActorRef}
3+
import akka.actor.{Actor, ActorRef}
44
import cromwell.backend.google.pipelines.common.PapiInstrumentation
55
import cromwell.backend.google.pipelines.common.api.PipelinesApiRequestManager.PipelinesApiStatusQueryFailed
66
import cromwell.backend.google.pipelines.common.api.{PipelinesApiRequestFactory, PipelinesApiRequestManager, RunStatus}
77
import cromwell.backend.standard.StandardAsyncJob
88
import cromwell.core.WorkflowId
9+
import cromwell.core.logging.JobLogging
910

1011
import scala.concurrent.{Future, Promise}
1112
import scala.util.{Failure, Success, Try}
@@ -16,7 +17,7 @@ import scala.util.{Failure, Success, Try}
1617
* Be sure to make the main class's receive look like:
1718
* override def receive = pollingActorClientReceive orElse { ... }
1819
*/
19-
trait PipelinesApiStatusRequestClient { this: Actor with ActorLogging with PapiInstrumentation =>
20+
trait PipelinesApiStatusRequestClient { this: Actor with JobLogging with PapiInstrumentation =>
2021

2122
private var pollingActorClientPromise: Option[Promise[RunStatus]] = None
2223

@@ -25,10 +26,12 @@ trait PipelinesApiStatusRequestClient { this: Actor with ActorLogging with PapiI
2526

2627
def pollingActorClientReceive: Actor.Receive = {
2728
case r: RunStatus =>
29+
jobLogger.info("AN-522 Polled RunStatus received")
2830
log.debug(s"Polled status received: $r")
2931
pollSuccess()
3032
completePromise(Success(r))
3133
case PipelinesApiStatusQueryFailed(_, e) =>
34+
jobLogger.info("AN-522 Poll failed!")
3235
log.debug("JES poll failed!")
3336
completePromise(Failure(e))
3437
}

0 commit comments

Comments
 (0)