From 0624e5aac489076d879549bc8d6afe9fcbf5373a Mon Sep 17 00:00:00 2001 From: Stephen Kazakoff Date: Thu, 30 Apr 2026 23:38:27 +1000 Subject: [PATCH] Prevent TaskPath leakage from typed record outputs Signed-off-by: Stephen Kazakoff --- .../processor/TaskOutputResolver.groovy | 74 +++++++++++++++++++ .../nextflow/processor/TaskProcessor.groovy | 4 +- .../processor/TaskOutputResolverTest.groovy | 50 +++++++++++++ tests/checks/.IGNORE-PARSER-V2 | 1 + .../typed-record-output-taskpath.nf/.checks | 20 +++++ tests/typed-record-output-taskpath.nf | 63 ++++++++++++++++ tests/typed-record-output-taskpath/input.txt | 1 + 7 files changed, 211 insertions(+), 2 deletions(-) create mode 100644 tests/checks/typed-record-output-taskpath.nf/.checks create mode 100644 tests/typed-record-output-taskpath.nf create mode 100644 tests/typed-record-output-taskpath/input.txt diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskOutputResolver.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskOutputResolver.groovy index ed93bbdb7b..65b3033cc8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskOutputResolver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskOutputResolver.groovy @@ -16,6 +16,8 @@ package nextflow.processor +import java.io.UncheckedIOException +import java.lang.reflect.Modifier import java.nio.file.Path import groovy.transform.CompileDynamic @@ -25,7 +27,10 @@ import groovy.util.logging.Slf4j import nextflow.exception.IllegalArityException import nextflow.exception.MissingFileException import nextflow.exception.MissingValueException +import nextflow.extension.Bolts import nextflow.script.params.v2.ProcessFileOutput +import nextflow.script.types.Record +import nextflow.util.RecordMap import org.codehaus.groovy.runtime.InvokerHelper /** * Implements the resolution of task outputs. @@ -49,6 +54,75 @@ class TaskOutputResolver implements Map { this.delegate = task.context } + /** + * Resolve and normalize an output expression before it is emitted. + * + * Values from the task context may contain {@link TaskPath}, which is a + * task-local view of an input file. It is valid for script interpolation, + * but it must not escape through output channels because downstream tasks + * need durable source/work-directory paths for hashing and staging. + * + * @param value + * A lazy output expression, such as a closure or GString + * @return + * The resolved output value with nested TaskPath instances converted + * back to durable Path values + */ + Object resolveOutput(Object value) { + return normalizeOutputValue(Bolts.resolveLazy(this, value)) + } + + static Object normalizeOutputValue(Object value) { + if( value instanceof TaskPath ) { + try { + return value.toRealPath() + } + catch( IOException e ) { + throw new UncheckedIOException(e) + } + } + + if( value instanceof RecordMap ) { + final normalized = new LinkedHashMap() + for( final entry : value.entrySet() ) + normalized.put(entry.key, normalizeOutputValue(entry.value)) + return new RecordMap(normalized) + } + + if( value instanceof Map ) { + final normalized = new LinkedHashMap() + for( final entry : value.entrySet() ) + normalized.put(entry.key, normalizeOutputValue(entry.value)) + return normalized + } + + if( value instanceof Set ) { + final normalized = new LinkedHashSet() + for( final item : value ) + normalized.add(normalizeOutputValue(item)) + return normalized + } + + if( value instanceof Collection ) { + final normalized = new ArrayList() + for( final item : value ) + normalized.add(normalizeOutputValue(item)) + return normalized + } + + if( value instanceof Record ) { + final fields = value.getClass().getFields() + .findAll { field -> !Modifier.isStatic(field.modifiers) && !field.synthetic } + .sort { it.name } + final normalized = new LinkedHashMap() + for( final field : fields ) + normalized.put(field.name, normalizeOutputValue(field.get(value))) + return new RecordMap(normalized) + } + + return value + } + /** * Get an environment variable from the task environment. * diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 0fe47e782c..99c8ba818e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -1421,12 +1421,12 @@ class TaskProcessor { final resolver = new TaskOutputResolver(declaredOutputs.getFiles(), task) for( final param : declaredOutputs.getParams() ) { - final value = resolver.resolveLazy(param.getLazyValue()) + final value = resolver.resolveOutput(param.getLazyValue()) task.setOutput(param, value) } for( final topic : declaredOutputs.getTopics() ) { - final value = resolver.resolveLazy(topic.getLazyValue()) + final value = resolver.resolveOutput(topic.getLazyValue()) topic.getChannel().bind(value) } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskOutputResolverTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskOutputResolverTest.groovy index d4cc2ed5a0..14937e8d76 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskOutputResolverTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskOutputResolverTest.groovy @@ -22,6 +22,7 @@ import nextflow.exception.IllegalArityException import nextflow.exception.MissingFileException import nextflow.exception.MissingValueException import nextflow.script.params.v2.ProcessFileOutput +import nextflow.util.RecordMap import spock.lang.Specification import spock.lang.TempDir @@ -30,6 +31,12 @@ import spock.lang.TempDir */ class TaskOutputResolverTest extends Specification { + static class SampleRecord implements nextflow.script.types.Record { + public String id + public Path path + public List paths + } + @TempDir Path tempDir @@ -290,4 +297,47 @@ class TaskOutputResolverTest extends Specification { def e = thrown(MissingValueException) e.message.contains('Missing variable in process output') } + + def 'should normalize task paths in output values'() { + given: + def source = tempDir.resolve('input.txt') + def taskPath = new TaskPath(source, 'input.txt') + def record = new SampleRecord(id: 'alpha', path: taskPath, paths: [taskPath]) + def value = new RecordMap([ + id: 'alpha', + path: taskPath, + nested: [path: taskPath], + paths: [taskPath] as Set, + record: record, + ]) + + when: + def result = TaskOutputResolver.normalizeOutputValue(value) + + then: + result instanceof RecordMap + result.path == source + !(result.path instanceof TaskPath) + result.nested.path == source + !(result.nested.path instanceof TaskPath) + result.paths == [source] as Set + result.record instanceof RecordMap + result.record.path == source + result.record.paths == [source] + } + + def 'should normalize resolved lazy output values'() { + given: + def source = tempDir.resolve('input.txt') + def task = makeTask([sample: new RecordMap(path: new TaskPath(source, 'input.txt'))]) + def resolver = new TaskOutputResolver([:], task) + + when: + def result = resolver.resolveOutput({ -> sample }) + + then: + result instanceof RecordMap + result.path == source + !(result.path instanceof TaskPath) + } } diff --git a/tests/checks/.IGNORE-PARSER-V2 b/tests/checks/.IGNORE-PARSER-V2 index 840c9b9c9f..2bfa89ed16 100644 --- a/tests/checks/.IGNORE-PARSER-V2 +++ b/tests/checks/.IGNORE-PARSER-V2 @@ -9,6 +9,7 @@ params-dsl.nf record-types.nf records.nf task-ext-block.nf +typed-record-output-taskpath.nf topic-channel-typed.nf type-annotations.nf workflow-oncomplete-v2.nf \ No newline at end of file diff --git a/tests/checks/typed-record-output-taskpath.nf/.checks b/tests/checks/typed-record-output-taskpath.nf/.checks new file mode 100644 index 0000000000..b52155d7e1 --- /dev/null +++ b/tests/checks/typed-record-output-taskpath.nf/.checks @@ -0,0 +1,20 @@ +set -e + +export NXF_SYNTAX_PARSER=v2 + +$NXF_RUN | tee stdout +[[ $(grep INFO .nextflow.log | grep -c 'Submitted process > P1') == 1 ]] +[[ $(grep INFO .nextflow.log | grep -c 'Submitted process > P2') == 1 ]] +grep 'test1' stdout +grep 'test2' stdout + +TASK_DIR=$($NXF_CMD log last -F "process == 'P2'") +grep "cat 'input.txt' 'test2.txt' > combined.txt" "$TASK_DIR/.command.sh" +test -e "$TASK_DIR/input.txt" +test -e "$TASK_DIR/test2.txt" + +$NXF_RUN -resume | tee stdout +[[ $(grep INFO .nextflow.log | grep -c 'Cached process > P1') == 1 ]] +[[ $(grep INFO .nextflow.log | grep -c 'Cached process > P2') == 1 ]] +grep 'test1' stdout +grep 'test2' stdout diff --git a/tests/typed-record-output-taskpath.nf b/tests/typed-record-output-taskpath.nf new file mode 100644 index 0000000000..a126899722 --- /dev/null +++ b/tests/typed-record-output-taskpath.nf @@ -0,0 +1,63 @@ +#!/usr/bin/env nextflow +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +nextflow.enable.types = true + +workflow { + sample = record( + id: 'alpha', + file: file("${projectDir}/typed-record-output-taskpath/input.txt") + ) + + result = P2(P1(channel.of(sample))) + result.view { it -> it.text.trim() } +} + +process P1 { + input: + sample: SampleRecord + + output: + sample + record(file2: file('test2.txt')) + + script: + """ + echo 'test2' > test2.txt + """ +} + +process P2 { + input: + sample: SampleRecord + + output: + file('combined.txt') + + script: + """ + test '${sample.file}' = 'input.txt' + test '${sample.file2}' = 'test2.txt' + + cat '${sample.file}' '${sample.file2}' > combined.txt + """ +} + +record SampleRecord { + id: String + file: Path + file2: Path? +} diff --git a/tests/typed-record-output-taskpath/input.txt b/tests/typed-record-output-taskpath/input.txt new file mode 100644 index 0000000000..a5bce3fd25 --- /dev/null +++ b/tests/typed-record-output-taskpath/input.txt @@ -0,0 +1 @@ +test1