Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.processor

import java.io.UncheckedIOException
import java.lang.reflect.Modifier
import java.nio.file.Path

import groovy.transform.CompileDynamic
Expand All @@ -25,7 +27,10 @@ import groovy.util.logging.Slf4j
import nextflow.exception.IllegalArityException
import nextflow.exception.MissingFileException
import nextflow.exception.MissingValueException
import nextflow.extension.Bolts
import nextflow.script.params.v2.ProcessFileOutput
import nextflow.script.types.Record
import nextflow.util.RecordMap
import org.codehaus.groovy.runtime.InvokerHelper
/**
* Implements the resolution of task outputs.
Expand All @@ -49,6 +54,75 @@ class TaskOutputResolver implements Map<String,Object> {
this.delegate = task.context
}

/**
* Resolve and normalize an output expression before it is emitted.
*
* Values from the task context may contain {@link TaskPath}, which is a
* task-local view of an input file. It is valid for script interpolation,
* but it must not escape through output channels because downstream tasks
* need durable source/work-directory paths for hashing and staging.
*
* @param value
* A lazy output expression, such as a closure or GString
* @return
* The resolved output value with nested TaskPath instances converted
* back to durable Path values
*/
Object resolveOutput(Object value) {
return normalizeOutputValue(Bolts.resolveLazy(this, value))
}

static Object normalizeOutputValue(Object value) {
if( value instanceof TaskPath ) {
try {
return value.toRealPath()
}
catch( IOException e ) {
throw new UncheckedIOException(e)
}
}

if( value instanceof RecordMap ) {
final normalized = new LinkedHashMap<String,Object>()
for( final entry : value.entrySet() )
normalized.put(entry.key, normalizeOutputValue(entry.value))
return new RecordMap(normalized)
}

if( value instanceof Map ) {
final normalized = new LinkedHashMap<Object,Object>()
for( final entry : value.entrySet() )
normalized.put(entry.key, normalizeOutputValue(entry.value))
return normalized
}

if( value instanceof Set ) {
final normalized = new LinkedHashSet<Object>()
for( final item : value )
normalized.add(normalizeOutputValue(item))
return normalized
}

if( value instanceof Collection ) {
final normalized = new ArrayList<Object>()
for( final item : value )
normalized.add(normalizeOutputValue(item))
return normalized
}

if( value instanceof Record ) {
final fields = value.getClass().getFields()
.findAll { field -> !Modifier.isStatic(field.modifiers) && !field.synthetic }
.sort { it.name }
final normalized = new LinkedHashMap<String,Object>()
for( final field : fields )
normalized.put(field.name, normalizeOutputValue(field.get(value)))
return new RecordMap(normalized)
}

return value
}

/**
* Get an environment variable from the task environment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1421,12 +1421,12 @@ class TaskProcessor {
final resolver = new TaskOutputResolver(declaredOutputs.getFiles(), task)

for( final param : declaredOutputs.getParams() ) {
final value = resolver.resolveLazy(param.getLazyValue())
final value = resolver.resolveOutput(param.getLazyValue())
task.setOutput(param, value)
}

for( final topic : declaredOutputs.getTopics() ) {
final value = resolver.resolveLazy(topic.getLazyValue())
final value = resolver.resolveOutput(topic.getLazyValue())
topic.getChannel().bind(value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import nextflow.exception.IllegalArityException
import nextflow.exception.MissingFileException
import nextflow.exception.MissingValueException
import nextflow.script.params.v2.ProcessFileOutput
import nextflow.util.RecordMap
import spock.lang.Specification
import spock.lang.TempDir

Expand All @@ -30,6 +31,12 @@ import spock.lang.TempDir
*/
class TaskOutputResolverTest extends Specification {

static class SampleRecord implements nextflow.script.types.Record {
public String id
public Path path
public List<Path> paths
}

@TempDir
Path tempDir

Expand Down Expand Up @@ -290,4 +297,47 @@ class TaskOutputResolverTest extends Specification {
def e = thrown(MissingValueException)
e.message.contains('Missing variable in process output')
}

def 'should normalize task paths in output values'() {
given:
def source = tempDir.resolve('input.txt')
def taskPath = new TaskPath(source, 'input.txt')
def record = new SampleRecord(id: 'alpha', path: taskPath, paths: [taskPath])
def value = new RecordMap([
id: 'alpha',
path: taskPath,
nested: [path: taskPath],
paths: [taskPath] as Set,
record: record,
])

when:
def result = TaskOutputResolver.normalizeOutputValue(value)

then:
result instanceof RecordMap
result.path == source
!(result.path instanceof TaskPath)
result.nested.path == source
!(result.nested.path instanceof TaskPath)
result.paths == [source] as Set
result.record instanceof RecordMap
result.record.path == source
result.record.paths == [source]
}

def 'should normalize resolved lazy output values'() {
given:
def source = tempDir.resolve('input.txt')
def task = makeTask([sample: new RecordMap(path: new TaskPath(source, 'input.txt'))])
def resolver = new TaskOutputResolver([:], task)

when:
def result = resolver.resolveOutput({ -> sample })

then:
result instanceof RecordMap
result.path == source
!(result.path instanceof TaskPath)
}
}
1 change: 1 addition & 0 deletions tests/checks/.IGNORE-PARSER-V2
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ params-dsl.nf
record-types.nf
records.nf
task-ext-block.nf
typed-record-output-taskpath.nf
topic-channel-typed.nf
type-annotations.nf
workflow-oncomplete-v2.nf
20 changes: 20 additions & 0 deletions tests/checks/typed-record-output-taskpath.nf/.checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
set -e

export NXF_SYNTAX_PARSER=v2

$NXF_RUN | tee stdout
[[ $(grep INFO .nextflow.log | grep -c 'Submitted process > P1') == 1 ]]
[[ $(grep INFO .nextflow.log | grep -c 'Submitted process > P2') == 1 ]]
grep 'test1' stdout
grep 'test2' stdout

TASK_DIR=$($NXF_CMD log last -F "process == 'P2'")
grep "cat 'input.txt' 'test2.txt' > combined.txt" "$TASK_DIR/.command.sh"
test -e "$TASK_DIR/input.txt"
test -e "$TASK_DIR/test2.txt"

$NXF_RUN -resume | tee stdout
[[ $(grep INFO .nextflow.log | grep -c 'Cached process > P1') == 1 ]]
[[ $(grep INFO .nextflow.log | grep -c 'Cached process > P2') == 1 ]]
grep 'test1' stdout
grep 'test2' stdout
63 changes: 63 additions & 0 deletions tests/typed-record-output-taskpath.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env nextflow
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

nextflow.enable.types = true

workflow {
sample = record(
id: 'alpha',
file: file("${projectDir}/typed-record-output-taskpath/input.txt")
)

result = P2(P1(channel.of(sample)))
result.view { it -> it.text.trim() }
}

process P1 {
input:
sample: SampleRecord

output:
sample + record(file2: file('test2.txt'))

script:
"""
echo 'test2' > test2.txt
"""
}

process P2 {
input:
sample: SampleRecord

output:
file('combined.txt')

script:
"""
test '${sample.file}' = 'input.txt'
test '${sample.file2}' = 'test2.txt'

cat '${sample.file}' '${sample.file2}' > combined.txt
"""
}

record SampleRecord {
id: String
file: Path
file2: Path?
}
1 change: 1 addition & 0 deletions tests/typed-record-output-taskpath/input.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test1
Loading