Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1075,12 +1075,6 @@ The following settings are available:
- `clientKey`
- `clientKeyFile`

`k8s.clientRefreshInterval`
: :::{versionadded} 26.04.0
:::
: The interval after which the Kubernetes client configuration is refreshed (default: `50m`).
: This setting is useful when the Kubernetes authentication token has a limited lifespan and needs to be periodically refreshed. The client configuration will be automatically reloaded after the specified interval, allowing Nextflow to obtain fresh credentials from the Kubernetes configuration.

`k8s.computeResourceType`
: Whether to use Kubernetes `Pod` or `Job` resource type to carry out Nextflow tasks (default: `Pod`).

Expand Down
1 change: 1 addition & 0 deletions plugins/nf-k8s/changelog.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
nf-k8s changelog
===================
1.5.2 - 20 Apr 2026
- Fix projected service account token expiry by re-reading token file from disk with 30s TTL cache (#6918)
- Bump org.bouncycastle:bcpkix-jdk18on from 1.79 to 1.84 (#7042) [59d847d52]
- Allow running pipeline from URL and main script path (#6602) [83196d4be]

Expand Down
7 changes: 0 additions & 7 deletions plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ class K8sConfig implements ConfigScope {
""")
final Map client

@ConfigOption
@Description("""
The interval after which the Kubernetes client configuration is refreshed (default: `50m`).
""")
final Duration clientRefreshInterval

@ConfigOption
@Description("""
The Kubernetes [configuration context](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to use.
Expand Down Expand Up @@ -220,7 +214,6 @@ class K8sConfig implements ConfigScope {
autoMountHostPaths = opts.autoMountHostPaths as boolean
cleanup = opts.cleanup as Boolean
client = opts.client as Map
clientRefreshInterval = opts.clientRefreshInterval as Duration ?: Duration.of('50m')
computeResourceType = opts.computeResourceType
context = opts.context
cpuLimits = opts.cpuLimits as boolean
Expand Down
24 changes: 3 additions & 21 deletions plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package nextflow.k8s

import java.util.concurrent.TimeUnit

import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
Expand All @@ -44,19 +40,9 @@ import org.pf4j.ExtensionPoint
@ServiceName('k8s')
class K8sExecutor extends Executor implements ExtensionPoint {

/**
* Cache for the Kubernetes HTTP client. The client is refreshed periodically
* so that the service account token is re-read when it expires.
*/
private Cache<String, K8sClient> clientCache
private K8sClient client

/**
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
* the client (including the service account token) when the configured interval expires.
*/
protected K8sClient getClient() {
clientCache.get('client', () -> new K8sClient(k8sConfig.getClient()))
}
protected K8sClient getClient() { client }

/**
* @return The `k8s` configuration scope in the nextflow configuration object
Expand All @@ -73,11 +59,7 @@ class K8sExecutor extends Executor implements ExtensionPoint {
protected void register() {
super.register()
final k8sConfig = getK8sConfig()
final refreshInterval = k8sConfig.clientRefreshInterval
this.clientCache = CacheBuilder.newBuilder()
.expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS)
.build()
final client = getClient()
this.client = new K8sClient(k8sConfig.getClient())
log.debug "[K8s] config=$k8sConfig; API client config=$client.config"
}

Expand Down
32 changes: 31 additions & 1 deletion plugins/nf-k8s/src/main/nextflow/k8s/client/ClientConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import groovy.util.logging.Slf4j
import nextflow.util.Duration

import javax.net.ssl.KeyManager
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths

Expand Down Expand Up @@ -49,6 +50,12 @@ class ClientConfig {

String token

/**
* Path to the service account token file. When set, {@link #getBearerToken()} re-reads
* the file periodically so that kubelet-rotated projected tokens are always current.
*/
Path tokenFile

byte[] sslCert

byte[] clientCert
Expand Down Expand Up @@ -76,14 +83,37 @@ class ClientConfig {
*/
boolean isFromCluster

private volatile String cachedToken
private volatile long tokenCachedAt

String getNamespace() { namespace ?: 'default' }

ClientConfig() {
retryConfig = new K8sRetryConfig()
}

String toString() {
"${this.class.getSimpleName()}[ server=$server, namespace=$namespace, serviceAccount=$serviceAccount, token=${cut(token)}, sslCert=${cut(sslCert)}, clientCert=${cut(clientCert)}, clientKey=${cut(clientKey)}, verifySsl=$verifySsl, fromFile=$isFromCluster, httpReadTimeout=$httpReadTimeout, httpConnectTimeout=$httpConnectTimeout, retryConfig=$retryConfig ]"
"${this.class.getSimpleName()}[ server=$server, namespace=$namespace, serviceAccount=$serviceAccount, token=${cut(token)}, tokenFile=$tokenFile, sslCert=${cut(sslCert)}, clientCert=${cut(clientCert)}, clientKey=${cut(clientKey)}, verifySsl=$verifySsl, fromFile=$isFromCluster, httpReadTimeout=$httpReadTimeout, httpConnectTimeout=$httpConnectTimeout, retryConfig=$retryConfig ]"
}

/**
* @return The current bearer token, re-read from {@link #tokenFile} periodically.
* Falls back to the last known-good value on read failure.
*/
String getBearerToken() {
if( !tokenFile )
return token
final now = System.currentTimeMillis()
if( cachedToken == null || now - tokenCachedAt >= 30_000 ) {
try {
cachedToken = Files.readString(tokenFile).trim()
tokenCachedAt = now
}
catch( Exception e ) {
log.warn "Failed to read Kubernetes service account token from $tokenFile -- ${e.message}"
}
}
return cachedToken
}

private String cut(String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ class ConfigDiscovery {
final server = formatHostName(host, port)

final cert = path('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt').bytes
final token = path('/var/run/secrets/kubernetes.io/serviceaccount/token').text
final tokenFile = path('/var/run/secrets/kubernetes.io/serviceaccount/token')
final namespace = path('/var/run/secrets/kubernetes.io/serviceaccount/namespace').text

return new ClientConfig(
server: server,
token: token,
tokenFile: tokenFile,
namespace: cfgNamespace ?: namespace,
serviceAccount: serviceAccount,
sslCert: cert,
Expand Down
5 changes: 3 additions & 2 deletions plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,9 @@ class K8sClient {
final prefix = config.server.contains("://") ? config.server : "https://$config.server"
final conn = createConnection0(prefix + path)
conn.setRequestProperty("Content-Type", "application/json")
if( config.token ) {
conn.setRequestProperty("Authorization", "Bearer $config.token")
final bearerToken = config.getBearerToken()
if( bearerToken ) {
conn.setRequestProperty("Authorization", "Bearer $bearerToken")
}

if( conn instanceof HttpsURLConnection ) {
Expand Down
17 changes: 0 additions & 17 deletions plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -467,21 +467,4 @@ class K8sConfigTest extends Specification {
cfg.fetchNodeName() == false
}

def 'should set clientRefreshInterval' () {
when:
def cfg = new K8sConfig()
then:
cfg.clientRefreshInterval == Duration.of('50m')

when:
cfg = new K8sConfig(clientRefreshInterval: '30m')
then:
cfg.clientRefreshInterval == Duration.of('30m')

when:
cfg = new K8sConfig(clientRefreshInterval: '1h')
then:
cfg.clientRefreshInterval == Duration.of('1h')
}

}
32 changes: 7 additions & 25 deletions plugins/nf-k8s/src/test/nextflow/k8s/K8sExecutorTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package nextflow.k8s

import java.util.concurrent.TimeUnit

import com.google.common.cache.CacheBuilder
import nextflow.k8s.client.ClientConfig
import nextflow.k8s.client.K8sClient
import spock.lang.Specification

Expand All @@ -28,39 +24,25 @@ import spock.lang.Specification
*/
class K8sExecutorTest extends Specification {

def 'should cache k8s client and refresh after expiration' () {
def 'should return the same k8s client instance on repeated calls' () {
given:
def CONFIG = new K8sConfig(
client: [server: 'http://k8s-server'],
namespace: 'test-ns',
serviceAccount: 'test-sa',
clientRefreshInterval: '100ms'
serviceAccount: 'test-sa'
)
and:
def executor = Spy(K8sExecutor)
executor.getK8sConfig() >> CONFIG
// use a short-lived cache for the test
executor.@clientCache = CacheBuilder.newBuilder()
.expireAfterWrite(100, TimeUnit.MILLISECONDS)
.build()
executor.@client = new K8sClient(CONFIG.getClient())

when: 'first call to getClient'
when:
def client1 = executor.getClient()
then: 'a new K8sClient is created'
client1 instanceof K8sClient
client1.config.server == 'http://k8s-server'

when: 'second call within cache interval'
def client2 = executor.getClient()
then: 'returns the same cached instance'
client2.is(client1)

when: 'call after cache expiration'
sleep(150)
def client3 = executor.getClient()
then: 'a new K8sClient instance is created'
client3 instanceof K8sClient
!client3.is(client1)
then:
client1 instanceof K8sClient
client2.is(client1)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ class ConfigDiscoveryTest extends Specification {
and:
config.server == 'foo.com:4343'
config.namespace == 'foo-namespace'
config.token == 'my-token'
config.token == null
config.tokenFile == TOKEN_FILE
config.getBearerToken() == 'my-token'
config.sslCert == CERT_FILE.text.bytes
config.isFromCluster

Expand All @@ -400,6 +402,7 @@ class ConfigDiscoveryTest extends Specification {
and:
config.server == 'https://host.com'
config.namespace == 'my-namespace'
config.tokenFile == TOKEN_FILE
}

def 'should create key managers' () {
Expand Down
47 changes: 47 additions & 0 deletions plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.k8s.client

import java.nio.file.Files

import nextflow.exception.K8sOutOfCpuException
import nextflow.exception.K8sOutOfMemoryException

Expand Down Expand Up @@ -68,6 +70,51 @@ class K8sClientTest extends Specification {

}

def 'should cache token from file and refresh after TTL' () {

given:
def tokenFile = Files.createTempFile('k8s-token', null)
tokenFile.text = 'token-v1'
def config = new ClientConfig(server: 'host.com:443', tokenFile: tokenFile)

when: 'first read primes the cache'
def t1 = config.getBearerToken()
then:
t1 == 'token-v1'

when: 'token file rotated but TTL not yet expired — cached value returned'
tokenFile.text = 'token-v2'
def t2 = config.getBearerToken()
then:
t2 == 'token-v1'

when: 'TTL expires — next read picks up the rotated token'
config.@tokenCachedAt = System.currentTimeMillis() - 31_000
def t3 = config.getBearerToken()
then:
t3 == 'token-v2'

cleanup:
Files.deleteIfExists(tokenFile)
}

def 'should return last good token when file read fails' () {

given:
def tokenFile = Files.createTempFile('k8s-token', null)
tokenFile.text = 'good-token'
def config = new ClientConfig(server: 'host.com:443', tokenFile: tokenFile)
config.getBearerToken() // prime the cache

when: 'file is deleted (simulates a transient read error) and TTL expires'
Files.delete(tokenFile)
config.@tokenCachedAt = System.currentTimeMillis() - 31_000
def result = config.getBearerToken()

then: 'last known-good token is returned rather than null'
result == 'good-token'
}

def 'should make a get request' () {

given:
Expand Down
Loading