Skip to content

Commit b2cc2f7

Browse files
committed
Move memory error detection from head to tail
1 parent 7342c07 commit b2cc2f7

File tree

12 files changed

+285
-6
lines changed

12 files changed

+285
-6
lines changed

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1421,6 +1421,12 @@ trait StandardAsyncExecutionActor
14211421
def readFile(path: Path, maxBytes: Option[Int]): Future[String] =
14221422
asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false)
14231423

1424+
/** Read the tail of a file, or fall back to reading the head. */
1425+
def readPartial(path: Path, maxBytes: Int): Future[String] =
1426+
asyncIo.tailAsStringAsync(path, maxBytes) recoverWith { case _ =>
1427+
asyncIo.contentAsStringAsync(path, Option(maxBytes), failOnOverflow = false)
1428+
}
1429+
14241430
def checkMemoryRetryRC(): Future[Boolean] =
14251431
readFile(jobPaths.memoryRetryRC, None) map { codeAsString =>
14261432
Try(codeAsString.trim.toInt) match {
@@ -1439,7 +1445,7 @@ trait StandardAsyncExecutionActor
14391445
}
14401446

14411447
def checkMemoryRetryStderr(errorKeys: List[String], maxBytes: Int): Future[Boolean] =
1442-
readFile(jobPaths.memoryRetryError, Option(maxBytes)) map { errorContent =>
1448+
readPartial(jobPaths.memoryRetryError, maxBytes) map { errorContent =>
14431449
errorKeys.exists(errorContent.contains)
14441450
}
14451451

core/src/main/scala/cromwell/core/io/AsyncIo.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ class AsyncIo(ioEndpoint: ActorRef, ioCommandBuilder: IoCommandBuilder) {
4242
def contentAsStringAsync(path: Path, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String] =
4343
asyncCommand(ioCommandBuilder.contentAsStringCommand(path, maxBytes, failOnOverflow))
4444

45+
def tailAsStringAsync(path: Path, maxBytes: Int): Future[String] =
46+
asyncCommand(ioCommandBuilder.tailAsString(path, maxBytes))
47+
4548
def writeAsync(path: Path, content: String, options: OpenOptions, compressPayload: Boolean = false): Future[Unit] =
4649
asyncCommand(ioCommandBuilder.writeCommand(path, content, options, compressPayload))
4750

core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cromwell.core.io
33
import better.files.File.OpenOptions
44
import cromwell.core.callcaching.FileHashStrategy
55
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
6+
import cromwell.core.io.IoTailAsStringCommand.IoTailOptions
67
import cromwell.core.path.Path
78

89
object DefaultIoCommand {
@@ -16,6 +17,11 @@ object DefaultIoCommand {
1617
override def commandDescription: String = s"DefaultIoContentAsStringCommand file '$file' options '$options'"
1718
}
1819

20+
case class DefaultIoTailAsStringCommand(override val file: Path, override val options: IoTailOptions)
21+
extends IoTailAsStringCommand(file, options) {
22+
override def commandDescription: String = s"DefaultIoTailAsStringCommand file '$file' options '$options'"
23+
}
24+
1925
case class DefaultIoSizeCommand(override val file: Path) extends IoSizeCommand(file) {
2026
override def commandDescription: String = s"DefaultIoSizeCommand file '$file'"
2127
}

core/src/main/scala/cromwell/core/io/IoCommand.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import common.util.Backoff
99
import common.util.StringUtil.EnhancedToStringable
1010
import cromwell.core.callcaching.FileHashStrategy
1111
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
12+
import cromwell.core.io.IoTailAsStringCommand.IoTailOptions
1213
import cromwell.core.path.Path
1314
import cromwell.core.retry.SimpleExponentialBackoff
1415
import org.slf4j.LoggerFactory
@@ -133,6 +134,23 @@ abstract class IoContentAsStringCommand(val file: Path,
133134
override lazy val name = "read"
134135
}
135136

137+
object IoTailAsStringCommand {
138+
139+
/**
140+
* Options to customize reading the tail of a file.
141+
* @param maxBytes Only reads up to maxBytes Bytes from the file
142+
*/
143+
case class IoTailOptions(maxBytes: Int)
144+
}
145+
146+
/**
147+
* Read the tail of a file as a string (load the entire content in memory)
148+
*/
149+
abstract class IoTailAsStringCommand(val file: Path, val options: IoTailOptions) extends SingleFileIoCommand[String] {
150+
override def toString = s"read tail of ${file.pathAsString}"
151+
override lazy val name = "tail"
152+
}
153+
136154
/**
137155
* Return the size of file
138156
*/

core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import cromwell.core.callcaching.FileHashStrategy
44
import cromwell.core.io.DefaultIoCommand._
55
import cromwell.core.io.IoCommand.{noopMetricsCallback, IOMetricsCallback}
66
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
7+
import cromwell.core.io.IoTailAsStringCommand.IoTailOptions
78
import cromwell.core.path.BetterFileMethods.OpenOptions
89
import cromwell.core.path.Path
910

@@ -16,6 +17,7 @@ import scala.util.Try
1617
abstract class PartialIoCommandBuilder {
1718
def contentAsStringCommand: PartialFunction[(Path, Option[Int], Boolean), Try[IoContentAsStringCommand]] =
1819
PartialFunction.empty
20+
def tailAsStringCommand: PartialFunction[(Path, Int), Try[IoTailAsStringCommand]] = PartialFunction.empty
1921
def writeCommand: PartialFunction[(Path, String, OpenOptions, Boolean), Try[IoWriteCommand]] = PartialFunction.empty
2022
def sizeCommand: PartialFunction[Path, Try[IoSizeCommand]] = PartialFunction.empty
2123
def deleteCommand: PartialFunction[(Path, Boolean), Try[IoDeleteCommand]] = PartialFunction.empty
@@ -63,6 +65,9 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp
6365
DefaultIoContentAsStringCommand(path, IoReadOptions(maxBytes, failOnOverflow))
6466
)
6567

68+
def tailAsString(path: Path, maxBytes: Int): Try[IoTailAsStringCommand] =
69+
buildOrDefault(_.tailAsStringCommand, (path, maxBytes), DefaultIoTailAsStringCommand(path, IoTailOptions(maxBytes)))
70+
6671
def writeCommand(path: Path,
6772
content: String,
6873
options: OpenOptions,

core/src/main/scala/cromwell/core/path/EvenBetterPathMethods.scala

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import java.io.{BufferedInputStream, BufferedReader, ByteArrayOutputStream, Inpu
44
import java.nio.file.{FileAlreadyExistsException, Files}
55
import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions}
66
import java.util.zip.GZIPOutputStream
7-
87
import better.files.File.OpenOptions
98
import cromwell.util.TryWithResource.tryWithResource
109

10+
import java.nio.channels.Channels
1111
import scala.jdk.CollectionConverters._
1212
import scala.concurrent.ExecutionContext
1313
import scala.io.Codec
@@ -86,12 +86,34 @@ trait EvenBetterPathMethods {
8686

8787
final def tailed(tailedSize: Int) = TailedWriter(this, tailedSize)
8888

89+
/**
90+
* Similar to newInputStream, but is overridable in subclasses to provide a different InputStream.
91+
*
92+
* Naming comes from the original method used from Google Cloud Storage, executeMediaAsInputStream()
93+
*/
8994
def mediaInputStream(implicit ec: ExecutionContext): InputStream = {
95+
// Use locally(ec) to avoid "parameter value ec in method mediaInputStream is never used"
96+
// It is used in the overridden methods, but not here.
9097
// See https://github.com/scala/bug/issues/10347 and https://github.com/scala/bug/issues/10790
9198
locally(ec)
9299
newInputStream
93100
}
94101

102+
/**
103+
* A tail version of mediaInputStream that reads the last numBytes of the file.
104+
*/
105+
def mediaInputStreamTail(numBytes: Long)(implicit ec: ExecutionContext): InputStream = {
106+
// Use locally(ec) to avoid "parameter value ec in method mediaInputStreamTail is never used"
107+
// It is used in the overridden methods, but not here.
108+
// See https://github.com/scala/bug/issues/10347 and https://github.com/scala/bug/issues/10790
109+
locally(ec)
110+
val channel = newFileChannel
111+
val size = channel.size()
112+
val startPosition = Math.max(0, size - numBytes)
113+
channel.position(startPosition)
114+
Channels.newInputStream(channel)
115+
}
116+
95117
protected def gzipByteArray(byteArray: Array[Byte]): Array[Byte] = {
96118
val byteStream = new ByteArrayOutputStream
97119
tryWithResource(() => new GZIPOutputStream(byteStream)) {
@@ -132,6 +154,14 @@ trait EvenBetterPathMethods {
132154
def withBufferedStream[A](f: BufferedInputStream => A)(implicit ec: ExecutionContext): A =
133155
tryWithResource(() => new BufferedInputStream(this.mediaInputStream))(f).recoverWith(fileIoErrorPf).get
134156

157+
/**
158+
* Similar to withBufferedStream, but reads from the tail of the stream.
159+
*/
160+
def withBufferedStreamTail[A](numBytes: Long)(f: BufferedInputStream => A)(implicit ec: ExecutionContext): A =
161+
tryWithResource(() => new BufferedInputStream(this.mediaInputStreamTail(numBytes)))(f)
162+
.recoverWith(fileIoErrorPf)
163+
.get
164+
135165
/**
136166
* Returns an Array[Byte] from a Path. Limit the array size to "limit" byte if defined.
137167
* @throws IOException if failOnOverflow is true and the file is larger than limit
@@ -151,9 +181,35 @@ trait EvenBetterPathMethods {
151181
}
152182

153183
/**
154-
* Reads the first limitBytes of a file and makes a String. Prepend with an annotation at the start (to say that this is the
155-
* first n bytes).
184+
* Reads the last bytes of a file and returns a String. If the file is larger than limit, it will
185+
* drop the first line and return the rest of the file as a String.
186+
*/
187+
def tailLines(limitBytes: Int)(implicit ec: ExecutionContext): String =
188+
// Add one because we'll be dropping until the end of the first line if the file is larger than limitBytes
189+
withBufferedStreamTail(limitBytes + 1L) { bufferedStream =>
190+
val bytes = bufferedStream.readAllBytes()
191+
if (bytes.length <= limitBytes) {
192+
new String(bytes, Codec.UTF8.charSet)
193+
} else {
194+
val newlineIndex = bytes.indexOf('\n')
195+
if (newlineIndex < 0) {
196+
""
197+
} else {
198+
new String(bytes.drop(newlineIndex + 1), Codec.UTF8.charSet)
199+
}
200+
}
201+
}
202+
203+
/**
204+
* Reads the limitBytes of a file and makes a String. Prepend with an annotation at the start (to say that this is a
205+
* limited number of bytes).
156206
*/
157207
def annotatedContentAsStringWithLimit(limitBytes: Int)(implicit ec: ExecutionContext): String =
158-
s"[First $limitBytes bytes]:" + new String(limitFileContent(Option(limitBytes), failOnOverflow = false))
208+
try
209+
s"[Last $limitBytes bytes]:" + tailLines(limitBytes)
210+
catch {
211+
// If anything goes wrong, such as we cannot seek to the end of the file, we will just return the first n bytes.
212+
case _: Exception =>
213+
s"[First $limitBytes bytes]:" + new String(limitFileContent(Option(limitBytes), failOnOverflow = false))
214+
}
159215
}

core/src/test/scala/cromwell/core/MockIoActor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class MockIoActor(returnCode: String, stderrSize: Long) extends Actor {
1717
case command: IoDeleteCommand => sender() ! IoSuccess(command, ())
1818
case command: IoSizeCommand => sender() ! IoSuccess(command, 0L)
1919
case command: IoContentAsStringCommand => sender() ! IoSuccess(command, "0")
20+
case command: IoTailAsStringCommand => sender() ! IoSuccess(command, "")
2021
case command: IoExistsCommand => sender() ! IoSuccess(command, false)
2122

2223
// With context
@@ -26,6 +27,8 @@ class MockIoActor(returnCode: String, stderrSize: Long) extends Actor {
2627
case (requestContext: Any, command: IoSizeCommand) => sender() ! (requestContext -> IoSuccess(command, stderrSize))
2728
case (requestContext: Any, command: IoContentAsStringCommand) =>
2829
sender() ! (requestContext -> IoSuccess(command, returnCode))
30+
case (requestContext: Any, command: IoTailAsStringCommand) =>
31+
sender() ! (requestContext -> IoSuccess(command, returnCode))
2932
case (requestContext: Any, command: IoExistsCommand) => sender() ! (requestContext -> IoSuccess(command, false))
3033

3134
case withPromise: IoCommandWithPromise[_] => self ! ((withPromise.promise, withPromise.ioCommand))

core/src/test/scala/cromwell/core/SimpleIoActor.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ class SimpleIoActor extends Actor {
4646
case Failure(failure) => sender() ! IoFailure(command, failure)
4747
}
4848

49+
case command: IoTailAsStringCommand =>
50+
Try(command.file.tailLines(command.options.maxBytes)(context.dispatcher)) match {
51+
case Success(content) => sender() ! IoSuccess(command, content)
52+
case Failure(failure) => sender() ! IoFailure(command, failure)
53+
}
54+
4955
case command: IoHashCommand =>
5056
Try(command.file.md5) match {
5157
case Success(hash) => sender() ! IoSuccess(command, hash)
@@ -89,6 +95,12 @@ class SimpleIoActor extends Actor {
8995
case Failure(failure) => sender() ! (requestContext -> IoFailure(command, failure))
9096
}
9197

98+
case (requestContext: Any, command: IoTailAsStringCommand) =>
99+
Try(command.file.tailLines(command.options.maxBytes)(context.dispatcher)) match {
100+
case Success(content) => sender() ! (requestContext -> IoSuccess(command, content))
101+
case Failure(failure) => sender() ! (requestContext -> IoFailure(command, failure))
102+
}
103+
92104
case (requestContext: Any, command: IoHashCommand) =>
93105
Try(command.file.md5) match {
94106
case Success(hash) => sender() ! (requestContext -> IoSuccess(command, hash))

core/src/test/scala/cromwell/core/io/AsyncIoSpec.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,50 @@ class AsyncIoSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers {
4040
}
4141
}
4242

43+
it should "tail asynchronously" in {
44+
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))
45+
46+
val testPath = DefaultPathBuilder.createTempFile()
47+
testPath.write("hello")
48+
49+
testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 2) map { result =>
50+
assert(result == "")
51+
}
52+
}
53+
54+
it should "tail a multi-line unix file asynchronously" in {
55+
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))
56+
57+
val testPath = DefaultPathBuilder.createTempFile()
58+
testPath.write("hello\nworld")
59+
60+
testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 8) map { result =>
61+
assert(result == "world")
62+
}
63+
}
64+
65+
it should "tail a multi-line windows file asynchronously" in {
66+
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))
67+
68+
val testPath = DefaultPathBuilder.createTempFile()
69+
testPath.write("hello\r\nworld")
70+
71+
testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 9) map { result =>
72+
assert(result == "world")
73+
}
74+
}
75+
76+
it should "tail the file if it's under the byte limit asynchronously" in {
77+
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))
78+
79+
val testPath = DefaultPathBuilder.createTempFile()
80+
testPath.write("hello")
81+
82+
testActor.underlyingActor.asyncIo.tailAsStringAsync(testPath, 6) map { result =>
83+
assert(result == "hello")
84+
}
85+
}
86+
4387
it should "get size asynchronously" in {
4488
val testActor = TestActorRef(new AsyncIoTestActor(simpleIoActor))
4589

engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class NioFlow(parallelism: Int,
7171
case sizeCommand: IoSizeCommand => size(sizeCommand) map sizeCommand.success
7272
case readAsStringCommand: IoContentAsStringCommand =>
7373
readAsString(readAsStringCommand) map readAsStringCommand.success
74+
case tailAsStringCommand: IoTailAsStringCommand =>
75+
tailAsString(tailAsStringCommand) map tailAsStringCommand.success
7476
case hashCommand: IoHashCommand => hash(hashCommand) map hashCommand.success
7577
case touchCommand: IoTouchCommand => touch(touchCommand) map touchCommand.success
7678
case existsCommand: IoExistsCommand => exists(existsCommand) map existsCommand.success
@@ -111,6 +113,10 @@ class NioFlow(parallelism: Int,
111113
).replaceAll("\\r\\n", "\\\n")
112114
}
113115

116+
private def tailAsString(command: IoTailAsStringCommand): IO[String] = IO {
117+
command.file.tailLines(command.options.maxBytes).replaceAll("\\r\\n", "\\\n")
118+
}
119+
114120
private def size(size: IoSizeCommand) =
115121
size.file match {
116122
case httpPath: HttpPath => IO.fromFuture(IO(httpPath.fetchSize))

0 commit comments

Comments
 (0)