Skip to content

Commit d2293fd

Browse files
authored
AN-522 Add temporary logging around token distribution (#7741)
1 parent 126c374 commit d2293fd

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
515515
}
516516

517517
onTransition { case fromState -> toState =>
518-
jobLogger.info("Transitioning from {}({}) to {}({})", fromState, stateData, toState, nextStateData)
518+
jobLogger.debug("Transitioning from {}({}) to {}({})", fromState, stateData, toState, nextStateData)
519519

520520
EngineJobExecutionActorState.transitionEventString(fromState, toState) foreach {
521521
eventList :+= ExecutionEvent(_)
@@ -548,11 +548,8 @@ class EngineJobExecutionActor(replyTo: ActorRef,
548548
// due to timeouts). That's ok, we just ignore this message in any other situation:
549549
stay()
550550
case Event(msg, _) =>
551-
log.error("Bad message from {} to EngineJobExecutionActor in state {}(with data {}): {}",
552-
sender(),
553-
stateName,
554-
stateData,
555-
msg
551+
jobLogger.error(
552+
s"Bad message from ${sender()} to EngineJobExecutionActor in state ${stateName}(with data ${stateData}): ${msg}"
556553
)
557554
stay()
558555
}
@@ -623,15 +620,23 @@ class EngineJobExecutionActor(replyTo: ActorRef,
623620
runJob(updatedData)
624621
}
625622

626-
private def requestRestartCheckToken(): Unit =
623+
private def requestRestartCheckToken(): Unit = {
624+
jobLogger.info(
625+
s"EJEA ${self.toString().split('#').lastOption.getOrElse("ERR").stripSuffix("]")} is requesting restart token of type ${backendLifecycleActorFactory.jobExecutionTokenType} for ${workflowDescriptor.backendDescriptor.hogGroup}"
626+
)
627627
jobRestartCheckTokenDispenserActor ! JobTokenRequest(workflowDescriptor.backendDescriptor.hogGroup,
628628
backendLifecycleActorFactory.jobRestartCheckTokenType
629629
)
630+
}
630631

631-
private def requestExecutionToken(): Unit =
632+
private def requestExecutionToken(): Unit = {
633+
jobLogger.info(
634+
s"EJEA ${self.toString().split('#').lastOption.getOrElse("ERR").stripSuffix("]")} is requesting execution token of type ${backendLifecycleActorFactory.jobExecutionTokenType} for ${workflowDescriptor.backendDescriptor.hogGroup}"
635+
)
632636
jobExecutionTokenDispenserActor ! JobTokenRequest(workflowDescriptor.backendDescriptor.hogGroup,
633637
backendLifecycleActorFactory.jobExecutionTokenType
634638
)
639+
}
635640

636641
// Return any currently held job restart check or execution token.
637642
private def returnCurrentToken(): Unit = if (

engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,13 @@ class JobTokenDispenserActor(override val serviceRegistryActor: ActorRef,
127127
count(tokensReturnedMetricPath, 0L, ServicesPrefix)
128128
}
129129

130-
private def enqueue(sndr: ActorRef, hogGroup: String, tokenType: JobTokenType): Unit =
130+
private def enqueue(sndr: ActorRef, hogGroup: String, tokenType: JobTokenType): Unit = {
131+
val senderId = sndr.toString().split('#').lastOption.getOrElse("ERR").stripSuffix("]")
132+
log.info(
133+
s"Enqueing request from ${senderId} for $dispenserType token of type $tokenType for $hogGroup"
134+
)
131135
if (tokenAssignments.contains(sndr)) {
136+
log.info(s"$senderId already assigned a token")
132137
sndr ! JobTokenDispensed
133138
} else {
134139
val queue = tokenQueues.getOrElse(tokenType, TokenQueue(tokenType, tokenEventLogger))
@@ -137,6 +142,7 @@ class JobTokenDispenserActor(override val serviceRegistryActor: ActorRef,
137142
increment(requestsEnqueuedMetricPath, ServicesPrefix)
138143
()
139144
}
145+
}
140146

141147
private def checkAndDispenseTokens(n: Int): Unit =
142148
if (tokenQueues.nonEmpty) {
@@ -179,7 +185,9 @@ class JobTokenDispenserActor(override val serviceRegistryActor: ActorRef,
179185

180186
if (nextTokens.nonEmpty) {
181187
val hogGroupCounts =
182-
nextTokens.groupBy(t => t.queuePlaceholder.hogGroup).map { case (hogGroup, list) => s"$hogGroup: ${list.size}" }
188+
nextTokens.groupBy(t => t.queuePlaceholder.hogGroup).map { case (hogGroup, list) =>
189+
s"$hogGroup: ${list.size} (${list.map(_.actor.toString().split('#').lastOption.getOrElse("ERR").stripSuffix("]")).mkString(", ")})"
190+
}
183191
log.info(s"Assigned new job $dispenserType tokens to the following groups: ${hogGroupCounts.mkString(", ")}")
184192
}
185193

0 commit comments

Comments
 (0)