Skip to content

Add version, kind, and spec to lineage schema #6075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,18 @@ public RuntimeTypeAdapterFactory<T> registerSubtype(Class<? extends T> type) {
return registerSubtype(type, type.getSimpleName());
}

protected Class<?> getSubTypeFromLabel(String label){
return labelToSubtype.get(label);
}

protected String getLabelFromSubtype(Class<?> subType){
return subtypeToLabel.get(subType);
}

protected String getTypeFieldName(){
return typeFieldName;
}

@Override
public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) {
if (type == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
package nextflow.lineage.serde

import com.google.gson.Gson
import com.google.gson.JsonElement
import com.google.gson.JsonObject
import com.google.gson.JsonParseException
import com.google.gson.JsonParser
import com.google.gson.TypeAdapter
import com.google.gson.reflect.TypeToken
import com.google.gson.stream.JsonReader
import com.google.gson.stream.JsonWriter

import groovy.transform.CompileStatic
import nextflow.lineage.model.v1beta1.FileOutput
import nextflow.lineage.model.v1beta1.LinModel
Expand All @@ -34,15 +32,16 @@ import nextflow.lineage.model.v1beta1.Workflow
import nextflow.lineage.model.v1beta1.WorkflowOutput
import nextflow.lineage.model.v1beta1.WorkflowRun
import nextflow.serde.gson.RuntimeTypeAdapterFactory

/**
* Class to serialize LiSerializable objects including the Lineage model version.
*
* @author Jorge Ejarque <jorge.ejarque@seqera.io>
*/
@CompileStatic
class LinTypeAdapterFactory<T> extends RuntimeTypeAdapterFactory<T> {

public static final String VERSION_FIELD = 'version'
public static final String SPEC_FIELD = 'spec'
public static final String CURRENT_VERSION = LinModel.VERSION

LinTypeAdapterFactory() {
Expand All @@ -53,7 +52,6 @@ class LinTypeAdapterFactory<T> extends RuntimeTypeAdapterFactory<T> {
.registerSubtype(TaskRun, TaskRun.simpleName)
.registerSubtype(TaskOutput, TaskOutput.simpleName)
.registerSubtype(FileOutput, FileOutput.simpleName)

}

@Override
Expand All @@ -70,39 +68,43 @@ class LinTypeAdapterFactory<T> extends RuntimeTypeAdapterFactory<T> {
return new TypeAdapter<R>() {
@Override
void write(JsonWriter out, R value) throws IOException {
def json = delegate.toJsonTree(value)
if (json instanceof JsonObject) {
json = addVersion(json)
}
gson.toJson(json, out)
final object = new JsonObject()
object.addProperty(VERSION_FIELD, CURRENT_VERSION)
String label = getLabelFromSubtype(value.class)
if (!label)
throw new JsonParseException("Not registered class ${value.class}")
object.addProperty(getTypeFieldName(), label)
def json = gson.toJsonTree(value)
object.add(SPEC_FIELD, json)
gson.toJson(object, out)
}

@Override
R read(JsonReader reader) throws IOException {
def json = JsonParser.parseReader(reader)
if (json instanceof JsonObject) {
def obj = (JsonObject) json
def versionEl = obj.get(VERSION_FIELD)
if (versionEl == null || versionEl.asString != CURRENT_VERSION) {
throw new JsonParseException("Invalid or missing version")
}
final obj = JsonParser.parseReader(reader)?.getAsJsonObject()
if( obj==null )
throw new JsonParseException("Parsed JSON object is null")
final versionEl = obj.get(VERSION_FIELD)
if (versionEl == null || versionEl.asString != CURRENT_VERSION) {
throw new JsonParseException("Invalid or missing '${VERSION_FIELD}' JSON property")
}
final typeEl = obj.get(getTypeFieldName())
if( typeEl==null )
throw new JsonParseException("JSON property '${getTypeFieldName()}' not found")

// Check if this is the new format (has 'spec' field) or old format (data at root level)
final specEl = obj.get(SPEC_FIELD)?.asJsonObject
if ( specEl != null ) {
// New format: data is wrapped in 'spec' field
specEl.add(getTypeFieldName(), typeEl)
return (R) delegate.fromJsonTree(specEl)
} else {
// Old format: data is at root level, just remove version field
obj.remove(VERSION_FIELD)
return (R) delegate.fromJsonTree(obj)
}
return delegate.fromJsonTree(json)
}
}
}

private static JsonObject addVersion(JsonObject json){
if( json.has(VERSION_FIELD) )
throw new JsonParseException("object already defines a field named ${VERSION_FIELD}")

JsonObject clone = new JsonObject();
clone.addProperty(VERSION_FIELD, CURRENT_VERSION)
for (Map.Entry<String, JsonElement> e : json.entrySet()) {
clone.add(e.getKey(), e.getValue());
}
return clone
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -387,27 +387,27 @@ class LinCommandImplTest extends Specification{
def expectedOutput = '''diff --git 12345 67890
--- 12345
+++ 67890
@@ -1,16 +1,16 @@
{
@@ -2,16 +2,16 @@
"version": "lineage/v1beta1",
"kind": "FileOutput",
- "path": "path/to/file",
+ "path": "path/to/file2",
"checksum": {
- "value": "45372qe",
+ "value": "42472qet",
"algorithm": "nextflow",
"mode": "standard"
},
- "source": "lid://123987/file.bam",
+ "source": "lid://123987/file2.bam",
"workflowRun": "lid://123987/",
"taskRun": null,
- "size": 1234,
+ "size": 1235,
"createdAt": "1970-01-02T10:17:36.789Z",
"modifiedAt": "1970-01-02T10:17:36.789Z",
"labels": null
"spec": {
- "path": "path/to/file",
+ "path": "path/to/file2",
"checksum": {
- "value": "45372qe",
+ "value": "42472qet",
"algorithm": "nextflow",
"mode": "standard"
},
- "source": "lid://123987/file.bam",
+ "source": "lid://123987/file2.bam",
"workflowRun": "lid://123987/",
"taskRun": null,
- "size": 1234,
+ "size": 1235,
"createdAt": "1970-01-02T10:17:36.789Z",
"modifiedAt": "1970-01-02T10:17:36.789Z",
"labels": null
'''

when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class LinFileSystemProviderTest extends Specification {
def output = data.resolve("output.txt")
output.text = "Hello, World!"
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"FileOutput","path":"'+output.toString()+'"}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path":"'+output.toString()+'"}}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -179,7 +179,7 @@ class LinFileSystemProviderTest extends Specification {
def config = [lineage:[store:[location:wdir.toString()]]]
def outputMeta = wdir.resolve("12345")
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"WorkflowRun","sessionId":"session","name":"run_name","params":[{"type":"String","name":"param1","value":"value1"}]}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"WorkflowRun","spec":{"sessionId":"session","name":"run_name","params":[{"type":"String","name":"param1","value":"value1"}]}}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -238,7 +238,7 @@ class LinFileSystemProviderTest extends Specification {
def output = data.resolve("output.txt")
output.text = "Hello, World!"
outputMeta.mkdirs()
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"FileOutput","path":"'+output.toString()+'"}'
outputMeta.resolve(".data.json").text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path":"'+output.toString()+'"}}'

Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down Expand Up @@ -278,8 +278,8 @@ class LinFileSystemProviderTest extends Specification {
output1.resolve('file3.txt').text = 'file3'
wdir.resolve('12345/output1').mkdirs()
wdir.resolve('12345/output2').mkdirs()
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","kind":"TaskRun"}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path": "' + output1.toString() + '"}'
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","kind":"TaskRun","spec":{"name":"dummy"}}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path": "' + output1.toString() + '"}}'

and:
def config = [lineage:[store:[location:wdir.toString()]]]
Expand Down Expand Up @@ -403,7 +403,7 @@ class LinFileSystemProviderTest extends Specification {
output.resolve('abc').text = 'file1'
output.resolve('.foo').text = 'file2'
wdir.resolve('12345/output').mkdirs()
wdir.resolve('12345/output/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path": "' + output.toString() + '"}'
wdir.resolve('12345/output/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path": "' + output.toString() + '"}}'
and:
def provider = new LinFileSystemProvider()
def lid1 = provider.getPath(LinPath.asUri('lid://12345/output/abc'))
Expand All @@ -423,7 +423,7 @@ class LinFileSystemProviderTest extends Specification {
def file = data.resolve('abc')
file.text = 'Hello'
wdir.resolve('12345/abc').mkdirs()
wdir.resolve('12345/abc/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path":"' + file.toString() + '"}'
wdir.resolve('12345/abc/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path":"' + file.toString() + '"}}'
and:
Global.session = Mock(Session) { getConfig()>>config }
and:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ class LinPathTest extends Specification {

wdir.resolve('12345/output1').mkdirs()
wdir.resolve('12345/path/to/file2.txt').mkdirs()
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","kind":"TaskRun"}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path": "' + outputFolder.toString() + '"}'
wdir.resolve('12345/path/to/file2.txt/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput", "path": "' + outputFile.toString() + '"}'
wdir.resolve('12345/.data.json').text = '{"version":"lineage/v1beta1","kind":"TaskRun","spec":{"name":"test"}}'
wdir.resolve('12345/output1/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path": "' + outputFolder.toString() + '"}}'
wdir.resolve('12345/path/to/file2.txt/.data.json').text = '{"version":"lineage/v1beta1","kind":"FileOutput","spec":{"path": "' + outputFile.toString() + '"}}'
def time = OffsetDateTime.now()
def wfResultsMetadata = new LinEncoder().withPrettyPrint(true).encode(new WorkflowOutput(time, "lid://1234", [new Parameter( "Path", "a", "lid://1234/a.txt")]))
wdir.resolve('5678/').mkdirs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class LinEncoderTest extends Specification{
def encoded = encoder.encode(wfResults)
def object = encoder.decode(encoded)
then:
encoded == '{"version":"lineage/v1beta1","kind":"WorkflowOutput","createdAt":null,"workflowRun":"lid://1234","output":null}'
encoded == '{"version":"lineage/v1beta1","kind":"WorkflowOutput","spec":{"createdAt":null,"workflowRun":"lid://1234","output":null}}'
def result = object as WorkflowOutput
result.createdAt == null

Expand Down
Loading