From da0a52b2e775a1790cf9b20b1236e3aa25f0a102 Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 13 May 2025 11:54:44 +0200 Subject: [PATCH] include workflow metadata in lineage Signed-off-by: jorgee --- .../main/nextflow/lineage/LinObserver.groovy | 37 ++++++++++++++++++- .../lineage/model/v1beta1/Workflow.groovy | 12 ++++++ .../lineage/model/v1beta1/WorkflowRun.groovy | 5 +++ .../nextflow/lineage/LinObserverTest.groovy | 34 +++++++++++++---- 4 files changed, 78 insertions(+), 10 deletions(-) diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy index d4aa43722a..92a3018e3f 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy @@ -16,7 +16,11 @@ package nextflow.lineage +import nextflow.NextflowMeta +import nextflow.extension.FilesEx import nextflow.lineage.exception.OutputRelativePathException +import nextflow.script.FusionMetadata +import nextflow.script.WaveMetadata import static nextflow.lineage.fs.LinPath.* @@ -71,6 +75,11 @@ import nextflow.util.TestOnly @Slf4j @CompileStatic class LinObserver implements TraceObserverV2 { + private static List workflowMetadataPropertiesToRemove = [ + "sessionId", "name", //Already in workflowRun + "scriptFile", "scriptName", "scriptId", "repository", "commitId", "revision", "projectName", "manifest", //Already in workflow + "stats", "success" // End + ] private static Map, String> taskParamToValue = [ (StdOutParam) : "stdout", (StdInParam) : "stdin", @@ -155,7 +164,10 @@ class LinObserver implements TraceObserverV2 { final workflow = new Workflow( collectScriptDataPaths(normalizer), session.workflowMetadata.repository, - session.workflowMetadata.commitId + session.workflowMetadata.commitId, + session.workflowMetadata.revision, + session.workflowMetadata.projectName, + session.workflowMetadata.manifest?.toMap() ) // create the workflow run main object final value = new WorkflowRun( @@ -163,7 +175,8 @@ class LinObserver implements TraceObserverV2 { session.uniqueId.toString(), session.runName, getNormalizedParams(session.params, normalizer), - SecretHelper.hideSecrets(session.config.deepClone()) as Map + SecretHelper.hideSecrets(session.config.deepClone()) as Map, + addOtherMetadata() ) final executionHash = CacheHelper.hasher(value).hash().toString() store.save(executionHash, value) @@ -473,4 +486,24 @@ class LinObserver implements TraceObserverV2 { } return paths } + + private Map addOtherMetadata() { + try { + def metadata = session.workflowMetadata.toMap() + .collectEntries { it.value instanceof Path ? [it.key, FilesEx.toUriString(it.value as Path) ] : [it.key, it.value] } + metadata.removeAll {it.key.toString() in workflowMetadataPropertiesToRemove } + if( metadata.containsKey("nextflow") ) + metadata["nextflow"] = (metadata["nextflow"] as NextflowMeta).toJsonMap() + if( metadata.containsKey("configFiles") ) + metadata["configFiles"] = (metadata["configFiles"] as List).collect {FilesEx.toUriString(it)} + if( metadata.containsKey( "wave") ) + metadata["wave"] = (metadata["wave"] as WaveMetadata).enabled + if( metadata.containsKey( "fusion") ) + metadata["fusion"] = (metadata["fusion"] as FusionMetadata).enabled + return metadata + }catch( Throwable e){ + log.debug("Error creating metadata", e) + return [:] + } + } } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/model/v1beta1/Workflow.groovy b/modules/nf-lineage/src/main/nextflow/lineage/model/v1beta1/Workflow.groovy index b64a59a03d..0266fe4892 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/model/v1beta1/Workflow.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/model/v1beta1/Workflow.groovy @@ -41,4 +41,16 @@ class Workflow implements LinSerializable { * Workflow commit identifier */ String commitId + /** + * Workflow revision + */ + String revision + /** + * Project name + */ + String projectName + /** + * Workflow Manifest + */ + Map manifest } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/model/v1beta1/WorkflowRun.groovy b/modules/nf-lineage/src/main/nextflow/lineage/model/v1beta1/WorkflowRun.groovy index 806a479ec1..31db5e7d07 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/model/v1beta1/WorkflowRun.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/model/v1beta1/WorkflowRun.groovy @@ -48,4 +48,9 @@ class WorkflowRun implements LinSerializable { * Resolved Configuration */ Map config + /** + * Raw metadata + */ + Map metadata + } diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy index 2eb8d52785..e31164d6d7 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy @@ -17,6 +17,7 @@ package nextflow.lineage +import nextflow.extension.FilesEx import nextflow.lineage.exception.OutputRelativePathException import java.nio.file.Files @@ -165,14 +166,31 @@ class LinObserverTest extends Specification { def store = new DefaultLinStore(); def uniqueId = UUID.randomUUID() def scriptFile = folder.resolve("main.nf") + def map = [ + repository: "https://nextflow.io/nf-test/", + commitId: "123456", + scriptId: "78910", + scriptFile: scriptFile, + projectDir: folder.resolve("projectDir"), + revision: "main", + projectName: "nextflow.io/nf-test", + workDir: folder.resolve("workDir") + ] def metadata = Mock(WorkflowMetadata){ - getRepository() >> "https://nextflow.io/nf-test/" - getCommitId() >> "123456" - getScriptId() >> "78910" - getScriptFile() >> scriptFile - getProjectDir() >> folder.resolve("projectDir") - getWorkDir() >> folder.resolve("workDir") + getRepository() >> map.repository + getCommitId() >> map.commitId + getScriptId() >> map.scriptId + getScriptFile() >> map.scriptFile + getProjectDir() >> map.projectDir + getRevision() >> map.revision + getProjectName() >> map.projectName + getWorkDir() >> map.workDir + toMap() >> map } + def expectedMap = [ + projectDir: FilesEx.toUriString(map.projectDir), + workDir: FilesEx.toUriString(map.workDir) + ] def session = Mock(Session) { getConfig() >> config getUniqueId() >> uniqueId @@ -183,8 +201,8 @@ class LinObserverTest extends Specification { store.open(LineageConfig.create(session)) def observer = new LinObserver(session, store) def mainScript = new DataPath("file://${scriptFile.toString()}", new Checksum("78910", "nextflow", "standard")) - def workflow = new Workflow([mainScript],"https://nextflow.io/nf-test/", "123456" ) - def workflowRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [], config) + def workflow = new Workflow([mainScript], map.repository, map.commitId, map.revision, map.projectName ) + def workflowRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [], config, expectedMap) when: observer.onFlowCreate(session) observer.onFlowBegin()