From 66e13e5f51b0ef02d4d36ecee8545e7e14ef86a7 Mon Sep 17 00:00:00 2001 From: kvtodev Date: Sat, 14 Feb 2026 15:18:57 +0800 Subject: [PATCH 01/14] Add Ktor HTTP data source module Introduce a new lib-datasource-ktor module similar to `libraries/datasource_okhttp`. Providing KtorDataSource as an alternative HTTP data source implementation using Ktor client. --- api.txt | 13 + constants.gradle | 1 + core_settings.gradle | 2 + libraries/datasource_ktor/README.md | 64 +++ libraries/datasource_ktor/build.gradle | 41 ++ libraries/datasource_ktor/proguard-rules.txt | 11 + .../src/main/AndroidManifest.xml | 4 + .../media3/datasource/ktor/KtorDataSource.kt | 519 ++++++++++++++++++ .../media3/datasource/ktor/package-info.java | 4 + .../src/test/AndroidManifest.xml | 4 + .../ktor/KtorDataSourceContractTest.kt | 37 ++ .../datasource/ktor/KtorDataSourceTest.kt | 153 ++++++ 12 files changed, 853 insertions(+) create mode 100644 libraries/datasource_ktor/README.md create mode 100644 libraries/datasource_ktor/build.gradle create mode 100644 libraries/datasource_ktor/proguard-rules.txt create mode 100644 libraries/datasource_ktor/src/main/AndroidManifest.xml create mode 100644 libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt create mode 100644 libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/package-info.java create mode 100644 libraries/datasource_ktor/src/test/AndroidManifest.xml create mode 100644 libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt create mode 100644 libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt diff --git a/api.txt b/api.txt index d5c9a173af1..142dd386e4f 100644 --- a/api.txt +++ b/api.txt @@ -1414,6 +1414,19 @@ package androidx.media3.datasource.okhttp { } +package androidx.media3.datasource.ktor { + + public class KtorDataSource implements androidx.media3.datasource.DataSource androidx.media3.datasource.HttpDataSource { + } + + public static final class KtorDataSource.Factory implements androidx.media3.datasource.HttpDataSource.Factory { + ctor public KtorDataSource.Factory(io.ktor.client.HttpClient); + ctor public KtorDataSource.Factory(io.ktor.client.HttpClient, kotlinx.coroutines.CoroutineScope); + method public androidx.media3.datasource.ktor.KtorDataSource.Factory setUserAgent(@Nullable String); + } + +} + package androidx.media3.exoplayer { public final class ExoPlaybackException extends androidx.media3.common.PlaybackException { diff --git a/constants.gradle b/constants.gradle index 40513baf94d..c37b4d9a35c 100644 --- a/constants.gradle +++ b/constants.gradle @@ -65,6 +65,7 @@ project.ext { desugarJdkLibsVersion = '2.1.5' lottieVersion = '6.6.0' truthVersion = '1.4.0' + ktorVersion = '3.0.3' okhttpVersion = '4.12.0' testParameterInjectorVersion = '1.18' modulePrefix = ':' diff --git a/core_settings.gradle b/core_settings.gradle index 73f3c542f30..b4108e0216b 100644 --- a/core_settings.gradle +++ b/core_settings.gradle @@ -68,6 +68,8 @@ include modulePrefix + 'lib-datasource-rtmp' project(modulePrefix + 'lib-datasource-rtmp').projectDir = new File(rootDir, 'libraries/datasource_rtmp') include modulePrefix + 'lib-datasource-okhttp' project(modulePrefix + 'lib-datasource-okhttp').projectDir = new File(rootDir, 'libraries/datasource_okhttp') +include modulePrefix + 'lib-datasource-ktor' +project(modulePrefix + 'lib-datasource-ktor').projectDir = new File(rootDir, 'libraries/datasource_ktor') include modulePrefix + 'lib-decoder' project(modulePrefix + 'lib-decoder').projectDir = new File(rootDir, 'libraries/decoder') diff --git a/libraries/datasource_ktor/README.md b/libraries/datasource_ktor/README.md new file mode 100644 index 00000000000..965f8465bb3 --- /dev/null +++ b/libraries/datasource_ktor/README.md @@ -0,0 +1,64 @@ +# Ktor DataSource module + +This module provides an [HttpDataSource][] implementation that uses [Ktor][]. + +Ktor is a multiplatform HTTP client developed by JetBrains. It supports HTTP/2, +WebSocket, and Kotlin coroutines for asynchronous operations. + +[HttpDataSource]: ../datasource/src/main/java/androidx/media3/datasource/HttpDataSource.java +[Ktor]: https://ktor.io/ + +## Getting the module + +The easiest way to get the module is to add it as a gradle dependency: + +```groovy +implementation 'androidx.media3:media3-datasource-ktor:1.X.X' +``` + +where `1.X.X` is the version, which must match the version of the other media +modules being used. + +Alternatively, you can clone this GitHub project and depend on the module +locally. Instructions for doing this can be found in the [top level README][]. + +[top level README]: ../../README.md + +## Using the module + +Media components request data through `DataSource` instances. These instances +are obtained from instances of `DataSource.Factory`, which are instantiated and +injected from application code. + +If your application only needs to play http(s) content, using the Ktor +extension is as simple as updating any `DataSource.Factory` instantiations in +your application code to use `KtorDataSource.Factory`. If your application +also needs to play non-http(s) content such as local files, use: +``` +new DefaultDataSourceFactory( + ... + /* baseDataSourceFactory= */ new KtorDataSource.Factory(...)); +``` + +### Using with OkHttp engine + +```kotlin +val dataSourceFactory = KtorDataSource.Factory(OkHttp.create()) +``` + +### Using with a custom HttpClient + +```kotlin +val httpClient = HttpClient(OkHttp) { + engine { + // Configure OkHttp engine + } +} +val dataSourceFactory = KtorDataSource.Factory(httpClient) +``` + +## Links + +* [Javadoc][] + +[Javadoc]: https://developer.android.com/reference/androidx/media3/datasource/ktor/package-summary diff --git a/libraries/datasource_ktor/build.gradle b/libraries/datasource_ktor/build.gradle new file mode 100644 index 00000000000..582de699a39 --- /dev/null +++ b/libraries/datasource_ktor/build.gradle @@ -0,0 +1,41 @@ +apply from: "$gradle.ext.androidxMediaSettingsDir/common_library_config.gradle" + +apply plugin: 'kotlin-android' + +android { + namespace 'androidx.media3.datasource.ktor' + + defaultConfig.minSdkVersion project.ext.minSdkVersion + + publishing { + singleVariant('release') { + withSourcesJar() + } + } + + kotlinOptions { + jvmTarget = '1.8' + } +} + +dependencies { + api project(modulePrefix + 'lib-common') + api project(modulePrefix + 'lib-datasource') + implementation 'androidx.annotation:annotation:' + androidxAnnotationVersion + compileOnly 'com.google.errorprone:error_prone_annotations:' + errorProneVersion + compileOnly 'org.checkerframework:checker-qual:' + checkerframeworkVersion + compileOnly 'org.jetbrains.kotlin:kotlin-annotations-jvm:' + kotlinAnnotationsVersion + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:' + kotlinxCoroutinesVersion + testImplementation project(modulePrefix + 'test-utils') + testImplementation 'com.squareup.okhttp3:mockwebserver:' + okhttpVersion + testImplementation 'org.robolectric:robolectric:' + robolectricVersion + testImplementation 'io.ktor:ktor-client-okhttp:' + ktorVersion + api 'io.ktor:ktor-client-core:' + ktorVersion +} + +ext { + releaseArtifactId = 'media3-datasource-ktor' + releaseName = 'Media3 Ktor DataSource module' + +} +apply from: '../../publish.gradle' diff --git a/libraries/datasource_ktor/proguard-rules.txt b/libraries/datasource_ktor/proguard-rules.txt new file mode 100644 index 00000000000..feb77bb649d --- /dev/null +++ b/libraries/datasource_ktor/proguard-rules.txt @@ -0,0 +1,11 @@ +# Proguard rules specific to the Ktor extension. + +# Options for Ktor and Okio +-dontwarn io.ktor.** +-dontwarn okio.** +-dontwarn javax.annotation.** +-dontwarn org.conscrypt.** + +# Keep Ktor client classes +-keep class io.ktor.** { *; } +-keep class kotlinx.coroutines.** { *; } diff --git a/libraries/datasource_ktor/src/main/AndroidManifest.xml b/libraries/datasource_ktor/src/main/AndroidManifest.xml new file mode 100644 index 00000000000..499171ca5de --- /dev/null +++ b/libraries/datasource_ktor/src/main/AndroidManifest.xml @@ -0,0 +1,4 @@ + + + + diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt new file mode 100644 index 00000000000..03339f77841 --- /dev/null +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt @@ -0,0 +1,519 @@ +package androidx.media3.datasource.ktor + +import android.net.Uri +import androidx.media3.common.C +import androidx.media3.common.MediaLibraryInfo +import androidx.media3.common.PlaybackException +import androidx.media3.common.util.UnstableApi +import androidx.media3.common.util.Util +import androidx.media3.datasource.BaseDataSource +import androidx.media3.datasource.DataSourceException +import androidx.media3.datasource.DataSpec +import androidx.media3.datasource.HttpDataSource +import androidx.media3.datasource.HttpUtil +import androidx.media3.datasource.TransferListener +import com.google.common.base.Predicate +import com.google.common.net.HttpHeaders +import io.ktor.client.HttpClient +import io.ktor.client.request.headers +import io.ktor.client.request.prepareRequest +import io.ktor.client.request.setBody +import io.ktor.client.request.url +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.bodyAsChannel +import io.ktor.client.statement.request +import io.ktor.http.HttpMethod +import io.ktor.http.contentLength +import io.ktor.http.contentType +import io.ktor.utils.io.jvm.javaio.toInputStream +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import java.io.IOException +import java.io.InterruptedIOException +import java.util.TreeMap +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference +import kotlin.math.min + +/** + * An [HttpDataSource] that delegates to Ktor's [HttpClient]. + * + * Note: HTTP request headers will be set using all parameters passed via (in order of decreasing + * priority) the `dataSpec`, [setRequestProperty] and the default parameters used to construct + * the instance. + */ +class KtorDataSource private constructor( + private val httpClient: HttpClient, + private val coroutineScope: CoroutineScope, + private val userAgent: String?, + private val cacheControl: String?, + private val defaultRequestProperties: HttpDataSource.RequestProperties?, + private val contentTypePredicate: Predicate?, + private val requestProperties: HttpDataSource.RequestProperties +) : BaseDataSource(true), HttpDataSource { + + companion object { + private const val TAG = "KtorDataSource" + + init { + MediaLibraryInfo.registerModule("media3.datasource.ktor") + } + } + + /** + * [androidx.media3.datasource.DataSource.Factory] for [KtorDataSource] instances. + * + * @param httpClient A [HttpClient] for use by the sources created by the factory. + * @param scope A [CoroutineScope] for running suspend functions. If not provided, a default + * scope with [Dispatchers.IO] and a [SupervisorJob] will be created. + */ + class Factory( + private val httpClient: HttpClient, + private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + ) : HttpDataSource.Factory { + + private val defaultRequestProperties = HttpDataSource.RequestProperties() + + private var userAgent: String? = null + + private var cacheControl: String? = null + + private var transferListener: TransferListener? = null + + private var contentTypePredicate: Predicate? = null + + @UnstableApi + override fun setDefaultRequestProperties(defaultRequestProperties: Map): Factory { + this.defaultRequestProperties.clearAndSet(defaultRequestProperties) + return this + } + + /** + * Sets the user agent that will be used. + * + * The default is `null`, which causes the default user agent of the underlying [HttpClient] + * to be used. + * + * @param userAgent The user agent that will be used, or `null` to use the default user + * agent of the underlying [HttpClient]. + * @return This factory. + */ + fun setUserAgent(userAgent: String?): Factory { + this.userAgent = userAgent + return this + } + + /** + * Sets the Cache-Control header that will be used. + * + * The default is `null`. + * + * @param cacheControl The cache control header value that will be used, or `null` to clear + * a previously set value. + * @return This factory. + */ + @UnstableApi + fun setCacheControl(cacheControl: String?): Factory { + this.cacheControl = cacheControl + return this + } + + /** + * Sets a content type [Predicate]. If a content type is rejected by the predicate then a + * [HttpDataSource.InvalidContentTypeException] is thrown from [KtorDataSource.open]. + * + * The default is `null`. + * + * @param contentTypePredicate The content type [Predicate], or `null` to clear a predicate + * that was previously set. + * @return This factory. + */ + @UnstableApi + fun setContentTypePredicate(contentTypePredicate: Predicate?): Factory { + this.contentTypePredicate = contentTypePredicate + return this + } + + /** + * Sets the [TransferListener] that will be used. + * + * The default is `null`. + * + * See [androidx.media3.datasource.DataSource.addTransferListener]. + * + * @param transferListener The listener that will be used. + * @return This factory. + */ + @UnstableApi + fun setTransferListener(transferListener: TransferListener?): Factory { + this.transferListener = transferListener + return this + } + + @UnstableApi + override fun createDataSource(): KtorDataSource { + val client = httpClient + val dataSource = KtorDataSource( + client, + scope, + userAgent, + cacheControl, + defaultRequestProperties, + contentTypePredicate, + HttpDataSource.RequestProperties() + ) + transferListener?.let { dataSource.addTransferListener(it) } + return dataSource + } + } + + private var dataSpec: DataSpec? = null + + private var response: HttpResponse? = null + + private var responseInputStream: java.io.InputStream? = null + + private var currentJob: Job? = null + + private var connectionEstablished = false + private var bytesToRead: Long = 0 + private var bytesRead: Long = 0 + + @UnstableApi + override fun getUri(): Uri? { + return if (response != null) { + Uri.parse(response!!.request.url.toString()) + } else if (dataSpec != null) { + dataSpec!!.uri + } else { + null + } + } + + @UnstableApi + override fun getResponseCode(): Int { + return response?.status?.value ?: -1 + } + + @UnstableApi + override fun getResponseHeaders(): Map> { + val httpResponse = response ?: return emptyMap() + val headers = TreeMap>(String.CASE_INSENSITIVE_ORDER) + httpResponse.headers.names().forEach { name -> + headers[name] = httpResponse.headers.getAll(name) ?: emptyList() + } + return headers + } + + @UnstableApi + override fun setRequestProperty(name: String, value: String) { + requireNotNull(name) { "name cannot be null" } + requireNotNull(value) { "value cannot be null" } + requestProperties.set(name, value) + } + + @UnstableApi + override fun clearRequestProperty(name: String) { + requireNotNull(name) { "name cannot be null" } + requestProperties.remove(name) + } + + @UnstableApi + override fun clearAllRequestProperties() { + requestProperties.clear() + } + + @UnstableApi + @Throws(HttpDataSource.HttpDataSourceException::class) + override fun open(dataSpec: DataSpec): Long { + this.dataSpec = dataSpec + bytesRead = 0 + bytesToRead = 0 + transferInitializing(dataSpec) + + try { + val httpResponse = executeRequest(dataSpec) + this.response = httpResponse + this.responseInputStream = + executeSuspend { httpResponse.bodyAsChannel().toInputStream() } + } catch (e: IOException) { + if (e is HttpDataSource.HttpDataSourceException) throw e + throw HttpDataSource.HttpDataSourceException.createForIOException( + e, dataSpec, HttpDataSource.HttpDataSourceException.TYPE_OPEN + ) + } + + val httpResponse = this.response!! + val responseCode = httpResponse.status.value + + if (responseCode !in 200..299) { + if (responseCode == 416) { + val contentRange = httpResponse.headers[HttpHeaders.CONTENT_RANGE] + val documentSize = HttpUtil.getDocumentSize(contentRange) + if (dataSpec.position == documentSize) { + connectionEstablished = true + transferStarted(dataSpec) + return if (dataSpec.length != C.LENGTH_UNSET.toLong()) dataSpec.length else 0 + } + } + + val errorResponseBody: ByteArray = try { + responseInputStream?.readBytes() ?: Util.EMPTY_BYTE_ARRAY + } catch (e: IOException) { + Util.EMPTY_BYTE_ARRAY + } + + val headers = getResponseHeaders() + closeConnectionQuietly() + + val cause: IOException? = if (responseCode == 416) { + DataSourceException(PlaybackException.ERROR_CODE_IO_READ_POSITION_OUT_OF_RANGE) + } else { + null + } + + throw HttpDataSource.InvalidResponseCodeException( + responseCode, + httpResponse.status.description, + cause, + headers, + dataSpec, + errorResponseBody + ) + } + + val contentType = httpResponse.contentType()?.toString() ?: "" + if (contentTypePredicate != null && !contentTypePredicate.apply(contentType)) { + closeConnectionQuietly() + throw HttpDataSource.InvalidContentTypeException(contentType, dataSpec) + } + + val bytesToSkip = + if (responseCode == 200 && dataSpec.position != 0L) dataSpec.position else 0L + + if (dataSpec.length != C.LENGTH_UNSET.toLong()) { + bytesToRead = dataSpec.length + } else { + val contentLength = httpResponse.contentLength() ?: -1L + bytesToRead = + if (contentLength >= 0) contentLength - bytesToSkip else C.LENGTH_UNSET.toLong() + } + + connectionEstablished = true + transferStarted(dataSpec) + + try { + skipFully(bytesToSkip, dataSpec) + } catch (e: HttpDataSource.HttpDataSourceException) { + closeConnectionQuietly() + throw e + } + + return bytesToRead + } + + @UnstableApi + @Throws(HttpDataSource.HttpDataSourceException::class) + override fun read(buffer: ByteArray, offset: Int, length: Int): Int { + return try { + readInternal(buffer, offset, length) + } catch (e: IOException) { + throw HttpDataSource.HttpDataSourceException.createForIOException( + e, dataSpec!!, HttpDataSource.HttpDataSourceException.TYPE_READ + ) + } + } + + @UnstableApi + override fun close() { + if (connectionEstablished) { + connectionEstablished = false + transferEnded() + closeConnectionQuietly() + } + response = null + dataSpec = null + } + + @Throws(IOException::class) + private fun executeRequest(dataSpec: DataSpec): HttpResponse { + val urlString = dataSpec.uri.toString() + + val uri = Uri.parse(urlString) + val scheme = uri.scheme + if (scheme == null || !scheme.lowercase().startsWith("http")) { + throw HttpDataSource.HttpDataSourceException( + "Malformed URL", + dataSpec, + PlaybackException.ERROR_CODE_FAILED_RUNTIME_CHECK, + HttpDataSource.HttpDataSourceException.TYPE_OPEN + ) + } + + val mergedHeaders = HashMap() + defaultRequestProperties?.snapshot?.forEach { (key, value) -> + mergedHeaders[key] = value + } + requestProperties.snapshot.forEach { (key, value) -> + mergedHeaders[key] = value + } + dataSpec.httpRequestHeaders.forEach { (key, value) -> + mergedHeaders[key] = value + } + + return executeSuspend { + httpClient.prepareRequest { + url(urlString) + + headers { + mergedHeaders.forEach { (key, value) -> + append(key, value) + } + + val rangeHeader = + HttpUtil.buildRangeRequestHeader(dataSpec.position, dataSpec.length) + if (rangeHeader != null) { + append(HttpHeaders.RANGE, rangeHeader) + } + + if (userAgent != null) { + append(HttpHeaders.USER_AGENT, userAgent) + } + + if (cacheControl != null) { + append(HttpHeaders.CACHE_CONTROL, cacheControl) + } + + if (!dataSpec.isFlagSet(DataSpec.FLAG_ALLOW_GZIP)) { + append(HttpHeaders.ACCEPT_ENCODING, "identity") + } + } + + method = when (dataSpec.httpMethod) { + DataSpec.HTTP_METHOD_GET -> HttpMethod.Get + DataSpec.HTTP_METHOD_POST -> HttpMethod.Post + DataSpec.HTTP_METHOD_HEAD -> HttpMethod.Head + else -> HttpMethod.Get + } + + if (dataSpec.httpBody != null) { + setBody(dataSpec.httpBody!!) + } else if (dataSpec.httpMethod == DataSpec.HTTP_METHOD_POST) { + setBody(ByteArray(0)) + } + }.execute() + } + } + + @Throws(IOException::class) + private fun executeSuspend(block: suspend () -> T): T { + val exceptionRef = AtomicReference(null) + val resultRef = AtomicReference(null) + val latch = CountDownLatch(1) + + currentJob = coroutineScope.launch { + try { + resultRef.set(block()) + } catch (e: CancellationException) { + exceptionRef.set(InterruptedIOException()) + } catch (e: Exception) { + exceptionRef.set(e) + } finally { + latch.countDown() + } + } + + try { + latch.await() + } catch (e: InterruptedException) { + currentJob?.cancel() + throw InterruptedIOException() + } + + exceptionRef.get()?.let { throwable -> + when (throwable) { + is IOException -> throw throwable + is InterruptedIOException -> throw throwable + else -> throw IOException(throwable) + } + } + + @Suppress("UNCHECKED_CAST") + return resultRef.get() as T + } + + @Throws(HttpDataSource.HttpDataSourceException::class) + private fun skipFully(bytesToSkip: Long, dataSpec: DataSpec) { + if (bytesToSkip == 0L) return + + val skipBuffer = ByteArray(4096) + var remaining = bytesToSkip + + try { + val inputStream = responseInputStream ?: throw IOException("Stream closed") + while (remaining > 0) { + val readLength = min(remaining.toInt(), skipBuffer.size) + val read = inputStream.read(skipBuffer, 0, readLength) + + if (Thread.currentThread().isInterrupted) { + throw InterruptedIOException() + } + + if (read < 0) { + throw HttpDataSource.HttpDataSourceException( + dataSpec, + PlaybackException.ERROR_CODE_IO_READ_POSITION_OUT_OF_RANGE, + HttpDataSource.HttpDataSourceException.TYPE_OPEN + ) + } + + remaining -= read + bytesTransferred(read) + } + } catch (e: IOException) { + if (e is HttpDataSource.HttpDataSourceException) throw e + throw HttpDataSource.HttpDataSourceException( + dataSpec, + PlaybackException.ERROR_CODE_IO_UNSPECIFIED, + HttpDataSource.HttpDataSourceException.TYPE_OPEN + ) + } + } + + @Throws(IOException::class) + private fun readInternal(buffer: ByteArray, offset: Int, readLength: Int): Int { + if (readLength == 0) return 0 + + if (bytesToRead != C.LENGTH_UNSET.toLong()) { + val bytesRemaining = bytesToRead - bytesRead + if (bytesRemaining == 0L) return C.RESULT_END_OF_INPUT + + val actualReadLength = min(readLength.toLong(), bytesRemaining).toInt() + return readFromStream(buffer, offset, actualReadLength) + } + + return readFromStream(buffer, offset, readLength) + } + + @Throws(IOException::class) + private fun readFromStream(buffer: ByteArray, offset: Int, readLength: Int): Int { + val inputStream = responseInputStream ?: return C.RESULT_END_OF_INPUT + val read = inputStream.read(buffer, offset, readLength) + + if (read < 0) return C.RESULT_END_OF_INPUT + + bytesRead += read + bytesTransferred(read) + return read + } + + private fun closeConnectionQuietly() { + responseInputStream?.close() + responseInputStream = null + currentJob = null + } +} diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/package-info.java b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/package-info.java new file mode 100644 index 00000000000..2c8ed6752e3 --- /dev/null +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/package-info.java @@ -0,0 +1,4 @@ +@NonNullApi +package androidx.media3.datasource.ktor; + +import androidx.media3.common.util.NonNullApi; diff --git a/libraries/datasource_ktor/src/test/AndroidManifest.xml b/libraries/datasource_ktor/src/test/AndroidManifest.xml new file mode 100644 index 00000000000..7e4fa50124c --- /dev/null +++ b/libraries/datasource_ktor/src/test/AndroidManifest.xml @@ -0,0 +1,4 @@ + + + + diff --git a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt new file mode 100644 index 00000000000..d79e5cc6616 --- /dev/null +++ b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt @@ -0,0 +1,37 @@ +package androidx.media3.datasource.ktor + +import androidx.media3.datasource.DataSource +import androidx.media3.test.utils.DataSourceContractTest +import androidx.media3.test.utils.HttpDataSourceTestEnv +import androidx.test.ext.junit.runners.AndroidJUnit4 +import com.google.common.collect.ImmutableList +import io.ktor.client.HttpClient +import io.ktor.client.engine.okhttp.OkHttp +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import org.junit.Rule +import org.junit.runner.RunWith + +@RunWith(AndroidJUnit4::class) +class KtorDataSourceContractTest : DataSourceContractTest() { + + @JvmField + @Rule + var httpDataSourceTestEnv = HttpDataSourceTestEnv() + val httpClient = HttpClient() + + private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + + override fun createDataSource(): DataSource { + return KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + } + + override fun getTestResources(): ImmutableList { + return httpDataSourceTestEnv.servedResources + } + + override fun getNotFoundResources(): MutableList { + return httpDataSourceTestEnv.notFoundResources + } +} diff --git a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt new file mode 100644 index 00000000000..043693383a0 --- /dev/null +++ b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt @@ -0,0 +1,153 @@ +package androidx.media3.datasource.ktor + +import androidx.media3.datasource.DataSpec +import androidx.media3.datasource.HttpDataSource +import androidx.test.ext.junit.runners.AndroidJUnit4 +import com.google.common.truth.Truth.assertThat +import io.ktor.client.HttpClient +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import java.nio.charset.StandardCharsets +import java.util.HashMap +import java.util.concurrent.TimeUnit +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import org.junit.Assert.assertThrows +import org.junit.Test +import org.junit.runner.RunWith + +@RunWith(AndroidJUnit4::class) +class KtorDataSourceTest { + + private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + val httpClient = HttpClient() + + @Test + @Throws(Exception::class) + fun open_setsCorrectHeaders() { + val mockWebServer = MockWebServer() + mockWebServer.enqueue(MockResponse()) + + val propertyFromFactory = "fromFactory" + val defaultRequestProperties = HashMap() + defaultRequestProperties["0"] = propertyFromFactory + defaultRequestProperties["1"] = propertyFromFactory + defaultRequestProperties["2"] = propertyFromFactory + defaultRequestProperties["4"] = propertyFromFactory + + val dataSource = KtorDataSource.Factory(httpClient, coroutineScope) + .setDefaultRequestProperties(defaultRequestProperties) + .createDataSource() + + val propertyFromSetter = "fromSetter" + dataSource.setRequestProperty("1", propertyFromSetter) + dataSource.setRequestProperty("2", propertyFromSetter) + dataSource.setRequestProperty("3", propertyFromSetter) + dataSource.setRequestProperty("5", propertyFromSetter) + + val propertyFromDataSpec = "fromDataSpec" + val dataSpecRequestProperties = HashMap() + dataSpecRequestProperties["2"] = propertyFromDataSpec + dataSpecRequestProperties["3"] = propertyFromDataSpec + dataSpecRequestProperties["4"] = propertyFromDataSpec + dataSpecRequestProperties["6"] = propertyFromDataSpec + + val dataSpec = DataSpec.Builder() + .setUri(mockWebServer.url("/test-path").toString()) + .setHttpRequestHeaders(dataSpecRequestProperties) + .build() + + dataSource.open(dataSpec) + + val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) + assertThat(request).isNotNull() + val headers = request!!.headers + assertThat(headers["0"]).isEqualTo(propertyFromFactory) + assertThat(headers["1"]).isEqualTo(propertyFromSetter) + assertThat(headers["2"]).isEqualTo(propertyFromDataSpec) + assertThat(headers["3"]).isEqualTo(propertyFromDataSpec) + assertThat(headers["4"]).isEqualTo(propertyFromDataSpec) + assertThat(headers["5"]).isEqualTo(propertyFromSetter) + assertThat(headers["6"]).isEqualTo(propertyFromDataSpec) + } + + @Test + fun open_invalidResponseCode() { + val mockWebServer = MockWebServer() + mockWebServer.enqueue(MockResponse().setResponseCode(404).setBody("failure msg")) + + val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + + val dataSpec = DataSpec.Builder() + .setUri(mockWebServer.url("/test-path").toString()) + .build() + + val exception = assertThrows( + HttpDataSource.InvalidResponseCodeException::class.java + ) { dataSource.open(dataSpec) } + + assertThat(exception.responseCode).isEqualTo(404) + assertThat(exception.responseBody).isEqualTo("failure msg".toByteArray(StandardCharsets.UTF_8)) + } + + @Test + @Throws(Exception::class) + fun factory_setRequestPropertyAfterCreation_setsCorrectHeaders() { + val mockWebServer = MockWebServer() + mockWebServer.enqueue(MockResponse()) + val dataSpec = DataSpec.Builder() + .setUri(mockWebServer.url("/test-path").toString()) + .build() + + val factory = KtorDataSource.Factory(httpClient, coroutineScope) + val dataSource = factory.createDataSource() + + val defaultRequestProperties = HashMap() + defaultRequestProperties["0"] = "afterCreation" + factory.setDefaultRequestProperties(defaultRequestProperties) + dataSource.open(dataSpec) + + val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) + assertThat(request).isNotNull() + val headers = request!!.headers + assertThat(headers["0"]).isEqualTo("afterCreation") + } + + @Test + fun open_malformedUrl_throwsException() { + val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + + val dataSpec = DataSpec.Builder() + .setUri("not-a-valid-url") + .build() + + val exception = assertThrows( + HttpDataSource.HttpDataSourceException::class.java + ) { dataSource.open(dataSpec) } + + assertThat(exception.message).contains("Malformed URL") + } + + @Test + @Throws(Exception::class) + fun open_httpPost_sendsPostRequest() { + val mockWebServer = MockWebServer() + mockWebServer.enqueue(MockResponse()) + + val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + + val dataSpec = DataSpec.Builder() + .setUri(mockWebServer.url("/test-path").toString()) + .setHttpMethod(DataSpec.HTTP_METHOD_POST) + .setHttpBody("test body".toByteArray(StandardCharsets.UTF_8)) + .build() + + dataSource.open(dataSpec) + + val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) + assertThat(request).isNotNull() + assertThat(request!!.method).isEqualTo("POST") + assertThat(request.body.readUtf8()).isEqualTo("test body") + } +} From cf7746b06735580b00220a8f811aaff93c9b8d7c Mon Sep 17 00:00:00 2001 From: Ian Baker Date: Tue, 24 Feb 2026 15:22:27 +0000 Subject: [PATCH 02/14] Add timeouts to speed up KtorDataSourceContractTest 'not found' tests --- .../media3/datasource/ktor/KtorDataSourceContractTest.kt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt index d79e5cc6616..29c2416d75e 100644 --- a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt +++ b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt @@ -7,6 +7,7 @@ import androidx.test.ext.junit.runners.AndroidJUnit4 import com.google.common.collect.ImmutableList import io.ktor.client.HttpClient import io.ktor.client.engine.okhttp.OkHttp +import io.ktor.client.plugins.HttpTimeout import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob @@ -19,7 +20,13 @@ class KtorDataSourceContractTest : DataSourceContractTest() { @JvmField @Rule var httpDataSourceTestEnv = HttpDataSourceTestEnv() - val httpClient = HttpClient() + val httpClient = HttpClient() { + install(HttpTimeout) { + requestTimeoutMillis = 400 + connectTimeoutMillis = 400 + socketTimeoutMillis = 400 + } + } private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) From 282fe9ac38c12fb1a0933da832f0de2de5c91b27 Mon Sep 17 00:00:00 2001 From: Ian Baker Date: Tue, 24 Feb 2026 15:23:02 +0000 Subject: [PATCH 03/14] Reformat with ktfmt --- .../media3/datasource/ktor/KtorDataSource.kt | 810 +++++++++--------- .../ktor/KtorDataSourceContractTest.kt | 38 +- .../datasource/ktor/KtorDataSourceTest.kt | 261 +++--- 3 files changed, 553 insertions(+), 556 deletions(-) diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt index 03339f77841..55bd8d5a8a9 100644 --- a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt @@ -26,494 +26,496 @@ import io.ktor.http.HttpMethod import io.ktor.http.contentLength import io.ktor.http.contentType import io.ktor.utils.io.jvm.javaio.toInputStream -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.launch import java.io.IOException import java.io.InterruptedIOException import java.util.TreeMap import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference import kotlin.math.min +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch /** * An [HttpDataSource] that delegates to Ktor's [HttpClient]. * * Note: HTTP request headers will be set using all parameters passed via (in order of decreasing - * priority) the `dataSpec`, [setRequestProperty] and the default parameters used to construct - * the instance. + * priority) the `dataSpec`, [setRequestProperty] and the default parameters used to construct the + * instance. */ -class KtorDataSource private constructor( - private val httpClient: HttpClient, - private val coroutineScope: CoroutineScope, - private val userAgent: String?, - private val cacheControl: String?, - private val defaultRequestProperties: HttpDataSource.RequestProperties?, - private val contentTypePredicate: Predicate?, - private val requestProperties: HttpDataSource.RequestProperties +class KtorDataSource +private constructor( + private val httpClient: HttpClient, + private val coroutineScope: CoroutineScope, + private val userAgent: String?, + private val cacheControl: String?, + private val defaultRequestProperties: HttpDataSource.RequestProperties?, + private val contentTypePredicate: Predicate?, + private val requestProperties: HttpDataSource.RequestProperties, ) : BaseDataSource(true), HttpDataSource { - companion object { - private const val TAG = "KtorDataSource" - - init { - MediaLibraryInfo.registerModule("media3.datasource.ktor") - } - } - - /** - * [androidx.media3.datasource.DataSource.Factory] for [KtorDataSource] instances. - * - * @param httpClient A [HttpClient] for use by the sources created by the factory. - * @param scope A [CoroutineScope] for running suspend functions. If not provided, a default - * scope with [Dispatchers.IO] and a [SupervisorJob] will be created. - */ - class Factory( - private val httpClient: HttpClient, - private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) - ) : HttpDataSource.Factory { - - private val defaultRequestProperties = HttpDataSource.RequestProperties() - - private var userAgent: String? = null - - private var cacheControl: String? = null - - private var transferListener: TransferListener? = null - - private var contentTypePredicate: Predicate? = null - - @UnstableApi - override fun setDefaultRequestProperties(defaultRequestProperties: Map): Factory { - this.defaultRequestProperties.clearAndSet(defaultRequestProperties) - return this - } - - /** - * Sets the user agent that will be used. - * - * The default is `null`, which causes the default user agent of the underlying [HttpClient] - * to be used. - * - * @param userAgent The user agent that will be used, or `null` to use the default user - * agent of the underlying [HttpClient]. - * @return This factory. - */ - fun setUserAgent(userAgent: String?): Factory { - this.userAgent = userAgent - return this - } - - /** - * Sets the Cache-Control header that will be used. - * - * The default is `null`. - * - * @param cacheControl The cache control header value that will be used, or `null` to clear - * a previously set value. - * @return This factory. - */ - @UnstableApi - fun setCacheControl(cacheControl: String?): Factory { - this.cacheControl = cacheControl - return this - } - - /** - * Sets a content type [Predicate]. If a content type is rejected by the predicate then a - * [HttpDataSource.InvalidContentTypeException] is thrown from [KtorDataSource.open]. - * - * The default is `null`. - * - * @param contentTypePredicate The content type [Predicate], or `null` to clear a predicate - * that was previously set. - * @return This factory. - */ - @UnstableApi - fun setContentTypePredicate(contentTypePredicate: Predicate?): Factory { - this.contentTypePredicate = contentTypePredicate - return this - } - - /** - * Sets the [TransferListener] that will be used. - * - * The default is `null`. - * - * See [androidx.media3.datasource.DataSource.addTransferListener]. - * - * @param transferListener The listener that will be used. - * @return This factory. - */ - @UnstableApi - fun setTransferListener(transferListener: TransferListener?): Factory { - this.transferListener = transferListener - return this - } + companion object { + private const val TAG = "KtorDataSource" - @UnstableApi - override fun createDataSource(): KtorDataSource { - val client = httpClient - val dataSource = KtorDataSource( - client, - scope, - userAgent, - cacheControl, - defaultRequestProperties, - contentTypePredicate, - HttpDataSource.RequestProperties() - ) - transferListener?.let { dataSource.addTransferListener(it) } - return dataSource - } + init { + MediaLibraryInfo.registerModule("media3.datasource.ktor") } + } + + /** + * [androidx.media3.datasource.DataSource.Factory] for [KtorDataSource] instances. + * + * @param httpClient A [HttpClient] for use by the sources created by the factory. + * @param scope A [CoroutineScope] for running suspend functions. If not provided, a default scope + * with [Dispatchers.IO] and a [SupervisorJob] will be created. + */ + class Factory( + private val httpClient: HttpClient, + private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO), + ) : HttpDataSource.Factory { - private var dataSpec: DataSpec? = null + private val defaultRequestProperties = HttpDataSource.RequestProperties() - private var response: HttpResponse? = null + private var userAgent: String? = null - private var responseInputStream: java.io.InputStream? = null + private var cacheControl: String? = null - private var currentJob: Job? = null + private var transferListener: TransferListener? = null - private var connectionEstablished = false - private var bytesToRead: Long = 0 - private var bytesRead: Long = 0 + private var contentTypePredicate: Predicate? = null @UnstableApi - override fun getUri(): Uri? { - return if (response != null) { - Uri.parse(response!!.request.url.toString()) - } else if (dataSpec != null) { - dataSpec!!.uri - } else { - null - } + override fun setDefaultRequestProperties( + defaultRequestProperties: Map + ): Factory { + this.defaultRequestProperties.clearAndSet(defaultRequestProperties) + return this } - @UnstableApi - override fun getResponseCode(): Int { - return response?.status?.value ?: -1 + /** + * Sets the user agent that will be used. + * + * The default is `null`, which causes the default user agent of the underlying [HttpClient] to + * be used. + * + * @param userAgent The user agent that will be used, or `null` to use the default user agent of + * the underlying [HttpClient]. + * @return This factory. + */ + fun setUserAgent(userAgent: String?): Factory { + this.userAgent = userAgent + return this } + /** + * Sets the Cache-Control header that will be used. + * + * The default is `null`. + * + * @param cacheControl The cache control header value that will be used, or `null` to clear a + * previously set value. + * @return This factory. + */ @UnstableApi - override fun getResponseHeaders(): Map> { - val httpResponse = response ?: return emptyMap() - val headers = TreeMap>(String.CASE_INSENSITIVE_ORDER) - httpResponse.headers.names().forEach { name -> - headers[name] = httpResponse.headers.getAll(name) ?: emptyList() - } - return headers + fun setCacheControl(cacheControl: String?): Factory { + this.cacheControl = cacheControl + return this } + /** + * Sets a content type [Predicate]. If a content type is rejected by the predicate then a + * [HttpDataSource.InvalidContentTypeException] is thrown from [KtorDataSource.open]. + * + * The default is `null`. + * + * @param contentTypePredicate The content type [Predicate], or `null` to clear a predicate that + * was previously set. + * @return This factory. + */ @UnstableApi - override fun setRequestProperty(name: String, value: String) { - requireNotNull(name) { "name cannot be null" } - requireNotNull(value) { "value cannot be null" } - requestProperties.set(name, value) + fun setContentTypePredicate(contentTypePredicate: Predicate?): Factory { + this.contentTypePredicate = contentTypePredicate + return this } + /** + * Sets the [TransferListener] that will be used. + * + * The default is `null`. + * + * See [androidx.media3.datasource.DataSource.addTransferListener]. + * + * @param transferListener The listener that will be used. + * @return This factory. + */ @UnstableApi - override fun clearRequestProperty(name: String) { - requireNotNull(name) { "name cannot be null" } - requestProperties.remove(name) + fun setTransferListener(transferListener: TransferListener?): Factory { + this.transferListener = transferListener + return this } @UnstableApi - override fun clearAllRequestProperties() { - requestProperties.clear() + override fun createDataSource(): KtorDataSource { + val client = httpClient + val dataSource = + KtorDataSource( + client, + scope, + userAgent, + cacheControl, + defaultRequestProperties, + contentTypePredicate, + HttpDataSource.RequestProperties(), + ) + transferListener?.let { dataSource.addTransferListener(it) } + return dataSource } + } - @UnstableApi - @Throws(HttpDataSource.HttpDataSourceException::class) - override fun open(dataSpec: DataSpec): Long { - this.dataSpec = dataSpec - bytesRead = 0 - bytesToRead = 0 - transferInitializing(dataSpec) + private var dataSpec: DataSpec? = null - try { - val httpResponse = executeRequest(dataSpec) - this.response = httpResponse - this.responseInputStream = - executeSuspend { httpResponse.bodyAsChannel().toInputStream() } - } catch (e: IOException) { - if (e is HttpDataSource.HttpDataSourceException) throw e - throw HttpDataSource.HttpDataSourceException.createForIOException( - e, dataSpec, HttpDataSource.HttpDataSourceException.TYPE_OPEN - ) - } + private var response: HttpResponse? = null - val httpResponse = this.response!! - val responseCode = httpResponse.status.value - - if (responseCode !in 200..299) { - if (responseCode == 416) { - val contentRange = httpResponse.headers[HttpHeaders.CONTENT_RANGE] - val documentSize = HttpUtil.getDocumentSize(contentRange) - if (dataSpec.position == documentSize) { - connectionEstablished = true - transferStarted(dataSpec) - return if (dataSpec.length != C.LENGTH_UNSET.toLong()) dataSpec.length else 0 - } - } + private var responseInputStream: java.io.InputStream? = null - val errorResponseBody: ByteArray = try { - responseInputStream?.readBytes() ?: Util.EMPTY_BYTE_ARRAY - } catch (e: IOException) { - Util.EMPTY_BYTE_ARRAY - } + private var currentJob: Job? = null - val headers = getResponseHeaders() - closeConnectionQuietly() + private var connectionEstablished = false + private var bytesToRead: Long = 0 + private var bytesRead: Long = 0 - val cause: IOException? = if (responseCode == 416) { - DataSourceException(PlaybackException.ERROR_CODE_IO_READ_POSITION_OUT_OF_RANGE) - } else { - null - } + @UnstableApi + override fun getUri(): Uri? { + return if (response != null) { + Uri.parse(response!!.request.url.toString()) + } else if (dataSpec != null) { + dataSpec!!.uri + } else { + null + } + } + + @UnstableApi + override fun getResponseCode(): Int { + return response?.status?.value ?: -1 + } + + @UnstableApi + override fun getResponseHeaders(): Map> { + val httpResponse = response ?: return emptyMap() + val headers = TreeMap>(String.CASE_INSENSITIVE_ORDER) + httpResponse.headers.names().forEach { name -> + headers[name] = httpResponse.headers.getAll(name) ?: emptyList() + } + return headers + } + + @UnstableApi + override fun setRequestProperty(name: String, value: String) { + requireNotNull(name) { "name cannot be null" } + requireNotNull(value) { "value cannot be null" } + requestProperties.set(name, value) + } + + @UnstableApi + override fun clearRequestProperty(name: String) { + requireNotNull(name) { "name cannot be null" } + requestProperties.remove(name) + } + + @UnstableApi + override fun clearAllRequestProperties() { + requestProperties.clear() + } + + @UnstableApi + @Throws(HttpDataSource.HttpDataSourceException::class) + override fun open(dataSpec: DataSpec): Long { + this.dataSpec = dataSpec + bytesRead = 0 + bytesToRead = 0 + transferInitializing(dataSpec) + + try { + val httpResponse = executeRequest(dataSpec) + this.response = httpResponse + this.responseInputStream = executeSuspend { httpResponse.bodyAsChannel().toInputStream() } + } catch (e: IOException) { + if (e is HttpDataSource.HttpDataSourceException) throw e + throw HttpDataSource.HttpDataSourceException.createForIOException( + e, + dataSpec, + HttpDataSource.HttpDataSourceException.TYPE_OPEN, + ) + } - throw HttpDataSource.InvalidResponseCodeException( - responseCode, - httpResponse.status.description, - cause, - headers, - dataSpec, - errorResponseBody - ) + val httpResponse = this.response!! + val responseCode = httpResponse.status.value + + if (responseCode !in 200..299) { + if (responseCode == 416) { + val contentRange = httpResponse.headers[HttpHeaders.CONTENT_RANGE] + val documentSize = HttpUtil.getDocumentSize(contentRange) + if (dataSpec.position == documentSize) { + connectionEstablished = true + transferStarted(dataSpec) + return if (dataSpec.length != C.LENGTH_UNSET.toLong()) dataSpec.length else 0 } + } - val contentType = httpResponse.contentType()?.toString() ?: "" - if (contentTypePredicate != null && !contentTypePredicate.apply(contentType)) { - closeConnectionQuietly() - throw HttpDataSource.InvalidContentTypeException(contentType, dataSpec) + val errorResponseBody: ByteArray = + try { + responseInputStream?.readBytes() ?: Util.EMPTY_BYTE_ARRAY + } catch (e: IOException) { + Util.EMPTY_BYTE_ARRAY } - val bytesToSkip = - if (responseCode == 200 && dataSpec.position != 0L) dataSpec.position else 0L + val headers = getResponseHeaders() + closeConnectionQuietly() - if (dataSpec.length != C.LENGTH_UNSET.toLong()) { - bytesToRead = dataSpec.length + val cause: IOException? = + if (responseCode == 416) { + DataSourceException(PlaybackException.ERROR_CODE_IO_READ_POSITION_OUT_OF_RANGE) } else { - val contentLength = httpResponse.contentLength() ?: -1L - bytesToRead = - if (contentLength >= 0) contentLength - bytesToSkip else C.LENGTH_UNSET.toLong() + null } - connectionEstablished = true - transferStarted(dataSpec) + throw HttpDataSource.InvalidResponseCodeException( + responseCode, + httpResponse.status.description, + cause, + headers, + dataSpec, + errorResponseBody, + ) + } - try { - skipFully(bytesToSkip, dataSpec) - } catch (e: HttpDataSource.HttpDataSourceException) { - closeConnectionQuietly() - throw e - } + val contentType = httpResponse.contentType()?.toString() ?: "" + if (contentTypePredicate != null && !contentTypePredicate.apply(contentType)) { + closeConnectionQuietly() + throw HttpDataSource.InvalidContentTypeException(contentType, dataSpec) + } + + val bytesToSkip = if (responseCode == 200 && dataSpec.position != 0L) dataSpec.position else 0L - return bytesToRead + if (dataSpec.length != C.LENGTH_UNSET.toLong()) { + bytesToRead = dataSpec.length + } else { + val contentLength = httpResponse.contentLength() ?: -1L + bytesToRead = if (contentLength >= 0) contentLength - bytesToSkip else C.LENGTH_UNSET.toLong() } - @UnstableApi - @Throws(HttpDataSource.HttpDataSourceException::class) - override fun read(buffer: ByteArray, offset: Int, length: Int): Int { - return try { - readInternal(buffer, offset, length) - } catch (e: IOException) { - throw HttpDataSource.HttpDataSourceException.createForIOException( - e, dataSpec!!, HttpDataSource.HttpDataSourceException.TYPE_READ - ) - } + connectionEstablished = true + transferStarted(dataSpec) + + try { + skipFully(bytesToSkip, dataSpec) + } catch (e: HttpDataSource.HttpDataSourceException) { + closeConnectionQuietly() + throw e } - @UnstableApi - override fun close() { - if (connectionEstablished) { - connectionEstablished = false - transferEnded() - closeConnectionQuietly() - } - response = null - dataSpec = null + return bytesToRead + } + + @UnstableApi + @Throws(HttpDataSource.HttpDataSourceException::class) + override fun read(buffer: ByteArray, offset: Int, length: Int): Int { + return try { + readInternal(buffer, offset, length) + } catch (e: IOException) { + throw HttpDataSource.HttpDataSourceException.createForIOException( + e, + dataSpec!!, + HttpDataSource.HttpDataSourceException.TYPE_READ, + ) + } + } + + @UnstableApi + override fun close() { + if (connectionEstablished) { + connectionEstablished = false + transferEnded() + closeConnectionQuietly() + } + response = null + dataSpec = null + } + + @Throws(IOException::class) + private fun executeRequest(dataSpec: DataSpec): HttpResponse { + val urlString = dataSpec.uri.toString() + + val uri = Uri.parse(urlString) + val scheme = uri.scheme + if (scheme == null || !scheme.lowercase().startsWith("http")) { + throw HttpDataSource.HttpDataSourceException( + "Malformed URL", + dataSpec, + PlaybackException.ERROR_CODE_FAILED_RUNTIME_CHECK, + HttpDataSource.HttpDataSourceException.TYPE_OPEN, + ) } - @Throws(IOException::class) - private fun executeRequest(dataSpec: DataSpec): HttpResponse { - val urlString = dataSpec.uri.toString() - - val uri = Uri.parse(urlString) - val scheme = uri.scheme - if (scheme == null || !scheme.lowercase().startsWith("http")) { - throw HttpDataSource.HttpDataSourceException( - "Malformed URL", - dataSpec, - PlaybackException.ERROR_CODE_FAILED_RUNTIME_CHECK, - HttpDataSource.HttpDataSourceException.TYPE_OPEN - ) - } + val mergedHeaders = HashMap() + defaultRequestProperties?.snapshot?.forEach { (key, value) -> mergedHeaders[key] = value } + requestProperties.snapshot.forEach { (key, value) -> mergedHeaders[key] = value } + dataSpec.httpRequestHeaders.forEach { (key, value) -> mergedHeaders[key] = value } - val mergedHeaders = HashMap() - defaultRequestProperties?.snapshot?.forEach { (key, value) -> - mergedHeaders[key] = value - } - requestProperties.snapshot.forEach { (key, value) -> - mergedHeaders[key] = value - } - dataSpec.httpRequestHeaders.forEach { (key, value) -> - mergedHeaders[key] = value - } + return executeSuspend { + httpClient + .prepareRequest { + url(urlString) - return executeSuspend { - httpClient.prepareRequest { - url(urlString) - - headers { - mergedHeaders.forEach { (key, value) -> - append(key, value) - } - - val rangeHeader = - HttpUtil.buildRangeRequestHeader(dataSpec.position, dataSpec.length) - if (rangeHeader != null) { - append(HttpHeaders.RANGE, rangeHeader) - } - - if (userAgent != null) { - append(HttpHeaders.USER_AGENT, userAgent) - } - - if (cacheControl != null) { - append(HttpHeaders.CACHE_CONTROL, cacheControl) - } - - if (!dataSpec.isFlagSet(DataSpec.FLAG_ALLOW_GZIP)) { - append(HttpHeaders.ACCEPT_ENCODING, "identity") - } - } - - method = when (dataSpec.httpMethod) { - DataSpec.HTTP_METHOD_GET -> HttpMethod.Get - DataSpec.HTTP_METHOD_POST -> HttpMethod.Post - DataSpec.HTTP_METHOD_HEAD -> HttpMethod.Head - else -> HttpMethod.Get - } - - if (dataSpec.httpBody != null) { - setBody(dataSpec.httpBody!!) - } else if (dataSpec.httpMethod == DataSpec.HTTP_METHOD_POST) { - setBody(ByteArray(0)) - } - }.execute() - } - } + headers { + mergedHeaders.forEach { (key, value) -> append(key, value) } - @Throws(IOException::class) - private fun executeSuspend(block: suspend () -> T): T { - val exceptionRef = AtomicReference(null) - val resultRef = AtomicReference(null) - val latch = CountDownLatch(1) - - currentJob = coroutineScope.launch { - try { - resultRef.set(block()) - } catch (e: CancellationException) { - exceptionRef.set(InterruptedIOException()) - } catch (e: Exception) { - exceptionRef.set(e) - } finally { - latch.countDown() + val rangeHeader = HttpUtil.buildRangeRequestHeader(dataSpec.position, dataSpec.length) + if (rangeHeader != null) { + append(HttpHeaders.RANGE, rangeHeader) } - } - try { - latch.await() - } catch (e: InterruptedException) { - currentJob?.cancel() - throw InterruptedIOException() - } + if (userAgent != null) { + append(HttpHeaders.USER_AGENT, userAgent) + } - exceptionRef.get()?.let { throwable -> - when (throwable) { - is IOException -> throw throwable - is InterruptedIOException -> throw throwable - else -> throw IOException(throwable) + if (cacheControl != null) { + append(HttpHeaders.CACHE_CONTROL, cacheControl) } - } - @Suppress("UNCHECKED_CAST") - return resultRef.get() as T - } + if (!dataSpec.isFlagSet(DataSpec.FLAG_ALLOW_GZIP)) { + append(HttpHeaders.ACCEPT_ENCODING, "identity") + } + } + + method = + when (dataSpec.httpMethod) { + DataSpec.HTTP_METHOD_GET -> HttpMethod.Get + DataSpec.HTTP_METHOD_POST -> HttpMethod.Post + DataSpec.HTTP_METHOD_HEAD -> HttpMethod.Head + else -> HttpMethod.Get + } - @Throws(HttpDataSource.HttpDataSourceException::class) - private fun skipFully(bytesToSkip: Long, dataSpec: DataSpec) { - if (bytesToSkip == 0L) return + if (dataSpec.httpBody != null) { + setBody(dataSpec.httpBody!!) + } else if (dataSpec.httpMethod == DataSpec.HTTP_METHOD_POST) { + setBody(ByteArray(0)) + } + } + .execute() + } + } - val skipBuffer = ByteArray(4096) - var remaining = bytesToSkip + @Throws(IOException::class) + private fun executeSuspend(block: suspend () -> T): T { + val exceptionRef = AtomicReference(null) + val resultRef = AtomicReference(null) + val latch = CountDownLatch(1) + currentJob = + coroutineScope.launch { try { - val inputStream = responseInputStream ?: throw IOException("Stream closed") - while (remaining > 0) { - val readLength = min(remaining.toInt(), skipBuffer.size) - val read = inputStream.read(skipBuffer, 0, readLength) - - if (Thread.currentThread().isInterrupted) { - throw InterruptedIOException() - } - - if (read < 0) { - throw HttpDataSource.HttpDataSourceException( - dataSpec, - PlaybackException.ERROR_CODE_IO_READ_POSITION_OUT_OF_RANGE, - HttpDataSource.HttpDataSourceException.TYPE_OPEN - ) - } - - remaining -= read - bytesTransferred(read) - } - } catch (e: IOException) { - if (e is HttpDataSource.HttpDataSourceException) throw e - throw HttpDataSource.HttpDataSourceException( - dataSpec, - PlaybackException.ERROR_CODE_IO_UNSPECIFIED, - HttpDataSource.HttpDataSourceException.TYPE_OPEN - ) + resultRef.set(block()) + } catch (e: CancellationException) { + exceptionRef.set(InterruptedIOException()) + } catch (e: Exception) { + exceptionRef.set(e) + } finally { + latch.countDown() } + } + + try { + latch.await() + } catch (e: InterruptedException) { + currentJob?.cancel() + throw InterruptedIOException() } - @Throws(IOException::class) - private fun readInternal(buffer: ByteArray, offset: Int, readLength: Int): Int { - if (readLength == 0) return 0 + exceptionRef.get()?.let { throwable -> + when (throwable) { + is IOException -> throw throwable + is InterruptedIOException -> throw throwable + else -> throw IOException(throwable) + } + } - if (bytesToRead != C.LENGTH_UNSET.toLong()) { - val bytesRemaining = bytesToRead - bytesRead - if (bytesRemaining == 0L) return C.RESULT_END_OF_INPUT + @Suppress("UNCHECKED_CAST") + return resultRef.get() as T + } - val actualReadLength = min(readLength.toLong(), bytesRemaining).toInt() - return readFromStream(buffer, offset, actualReadLength) - } + @Throws(HttpDataSource.HttpDataSourceException::class) + private fun skipFully(bytesToSkip: Long, dataSpec: DataSpec) { + if (bytesToSkip == 0L) return - return readFromStream(buffer, offset, readLength) - } + val skipBuffer = ByteArray(4096) + var remaining = bytesToSkip - @Throws(IOException::class) - private fun readFromStream(buffer: ByteArray, offset: Int, readLength: Int): Int { - val inputStream = responseInputStream ?: return C.RESULT_END_OF_INPUT - val read = inputStream.read(buffer, offset, readLength) + try { + val inputStream = responseInputStream ?: throw IOException("Stream closed") + while (remaining > 0) { + val readLength = min(remaining.toInt(), skipBuffer.size) + val read = inputStream.read(skipBuffer, 0, readLength) - if (read < 0) return C.RESULT_END_OF_INPUT + if (Thread.currentThread().isInterrupted) { + throw InterruptedIOException() + } + + if (read < 0) { + throw HttpDataSource.HttpDataSourceException( + dataSpec, + PlaybackException.ERROR_CODE_IO_READ_POSITION_OUT_OF_RANGE, + HttpDataSource.HttpDataSourceException.TYPE_OPEN, + ) + } - bytesRead += read + remaining -= read bytesTransferred(read) - return read + } + } catch (e: IOException) { + if (e is HttpDataSource.HttpDataSourceException) throw e + throw HttpDataSource.HttpDataSourceException( + dataSpec, + PlaybackException.ERROR_CODE_IO_UNSPECIFIED, + HttpDataSource.HttpDataSourceException.TYPE_OPEN, + ) } + } - private fun closeConnectionQuietly() { - responseInputStream?.close() - responseInputStream = null - currentJob = null + @Throws(IOException::class) + private fun readInternal(buffer: ByteArray, offset: Int, readLength: Int): Int { + if (readLength == 0) return 0 + + if (bytesToRead != C.LENGTH_UNSET.toLong()) { + val bytesRemaining = bytesToRead - bytesRead + if (bytesRemaining == 0L) return C.RESULT_END_OF_INPUT + + val actualReadLength = min(readLength.toLong(), bytesRemaining).toInt() + return readFromStream(buffer, offset, actualReadLength) } + + return readFromStream(buffer, offset, readLength) + } + + @Throws(IOException::class) + private fun readFromStream(buffer: ByteArray, offset: Int, readLength: Int): Int { + val inputStream = responseInputStream ?: return C.RESULT_END_OF_INPUT + val read = inputStream.read(buffer, offset, readLength) + + if (read < 0) return C.RESULT_END_OF_INPUT + + bytesRead += read + bytesTransferred(read) + return read + } + + private fun closeConnectionQuietly() { + responseInputStream?.close() + responseInputStream = null + currentJob = null + } } diff --git a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt index 29c2416d75e..9af54cebd2d 100644 --- a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt +++ b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt @@ -6,7 +6,6 @@ import androidx.media3.test.utils.HttpDataSourceTestEnv import androidx.test.ext.junit.runners.AndroidJUnit4 import com.google.common.collect.ImmutableList import io.ktor.client.HttpClient -import io.ktor.client.engine.okhttp.OkHttp import io.ktor.client.plugins.HttpTimeout import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -17,28 +16,27 @@ import org.junit.runner.RunWith @RunWith(AndroidJUnit4::class) class KtorDataSourceContractTest : DataSourceContractTest() { - @JvmField - @Rule - var httpDataSourceTestEnv = HttpDataSourceTestEnv() - val httpClient = HttpClient() { - install(HttpTimeout) { - requestTimeoutMillis = 400 - connectTimeoutMillis = 400 - socketTimeoutMillis = 400 - } + @JvmField @Rule var httpDataSourceTestEnv = HttpDataSourceTestEnv() + val httpClient = + HttpClient() { + install(HttpTimeout) { + requestTimeoutMillis = 400 + connectTimeoutMillis = 400 + socketTimeoutMillis = 400 + } } - private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) - override fun createDataSource(): DataSource { - return KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() - } + override fun createDataSource(): DataSource { + return KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + } - override fun getTestResources(): ImmutableList { - return httpDataSourceTestEnv.servedResources - } + override fun getTestResources(): ImmutableList { + return httpDataSourceTestEnv.servedResources + } - override fun getNotFoundResources(): MutableList { - return httpDataSourceTestEnv.notFoundResources - } + override fun getNotFoundResources(): MutableList { + return httpDataSourceTestEnv.notFoundResources + } } diff --git a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt index 043693383a0..68c9f908abc 100644 --- a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt +++ b/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt @@ -5,12 +5,12 @@ import androidx.media3.datasource.HttpDataSource import androidx.test.ext.junit.runners.AndroidJUnit4 import com.google.common.truth.Truth.assertThat import io.ktor.client.HttpClient -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.SupervisorJob import java.nio.charset.StandardCharsets import java.util.HashMap import java.util.concurrent.TimeUnit +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer import org.junit.Assert.assertThrows @@ -20,134 +20,131 @@ import org.junit.runner.RunWith @RunWith(AndroidJUnit4::class) class KtorDataSourceTest { - private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) - val httpClient = HttpClient() - - @Test - @Throws(Exception::class) - fun open_setsCorrectHeaders() { - val mockWebServer = MockWebServer() - mockWebServer.enqueue(MockResponse()) - - val propertyFromFactory = "fromFactory" - val defaultRequestProperties = HashMap() - defaultRequestProperties["0"] = propertyFromFactory - defaultRequestProperties["1"] = propertyFromFactory - defaultRequestProperties["2"] = propertyFromFactory - defaultRequestProperties["4"] = propertyFromFactory - - val dataSource = KtorDataSource.Factory(httpClient, coroutineScope) - .setDefaultRequestProperties(defaultRequestProperties) - .createDataSource() - - val propertyFromSetter = "fromSetter" - dataSource.setRequestProperty("1", propertyFromSetter) - dataSource.setRequestProperty("2", propertyFromSetter) - dataSource.setRequestProperty("3", propertyFromSetter) - dataSource.setRequestProperty("5", propertyFromSetter) - - val propertyFromDataSpec = "fromDataSpec" - val dataSpecRequestProperties = HashMap() - dataSpecRequestProperties["2"] = propertyFromDataSpec - dataSpecRequestProperties["3"] = propertyFromDataSpec - dataSpecRequestProperties["4"] = propertyFromDataSpec - dataSpecRequestProperties["6"] = propertyFromDataSpec - - val dataSpec = DataSpec.Builder() - .setUri(mockWebServer.url("/test-path").toString()) - .setHttpRequestHeaders(dataSpecRequestProperties) - .build() - - dataSource.open(dataSpec) - - val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) - assertThat(request).isNotNull() - val headers = request!!.headers - assertThat(headers["0"]).isEqualTo(propertyFromFactory) - assertThat(headers["1"]).isEqualTo(propertyFromSetter) - assertThat(headers["2"]).isEqualTo(propertyFromDataSpec) - assertThat(headers["3"]).isEqualTo(propertyFromDataSpec) - assertThat(headers["4"]).isEqualTo(propertyFromDataSpec) - assertThat(headers["5"]).isEqualTo(propertyFromSetter) - assertThat(headers["6"]).isEqualTo(propertyFromDataSpec) - } - - @Test - fun open_invalidResponseCode() { - val mockWebServer = MockWebServer() - mockWebServer.enqueue(MockResponse().setResponseCode(404).setBody("failure msg")) - - val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() - - val dataSpec = DataSpec.Builder() - .setUri(mockWebServer.url("/test-path").toString()) - .build() - - val exception = assertThrows( - HttpDataSource.InvalidResponseCodeException::class.java - ) { dataSource.open(dataSpec) } - - assertThat(exception.responseCode).isEqualTo(404) - assertThat(exception.responseBody).isEqualTo("failure msg".toByteArray(StandardCharsets.UTF_8)) - } - - @Test - @Throws(Exception::class) - fun factory_setRequestPropertyAfterCreation_setsCorrectHeaders() { - val mockWebServer = MockWebServer() - mockWebServer.enqueue(MockResponse()) - val dataSpec = DataSpec.Builder() - .setUri(mockWebServer.url("/test-path").toString()) - .build() - - val factory = KtorDataSource.Factory(httpClient, coroutineScope) - val dataSource = factory.createDataSource() - - val defaultRequestProperties = HashMap() - defaultRequestProperties["0"] = "afterCreation" - factory.setDefaultRequestProperties(defaultRequestProperties) - dataSource.open(dataSpec) - - val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) - assertThat(request).isNotNull() - val headers = request!!.headers - assertThat(headers["0"]).isEqualTo("afterCreation") - } - - @Test - fun open_malformedUrl_throwsException() { - val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() - - val dataSpec = DataSpec.Builder() - .setUri("not-a-valid-url") - .build() - - val exception = assertThrows( - HttpDataSource.HttpDataSourceException::class.java - ) { dataSource.open(dataSpec) } - - assertThat(exception.message).contains("Malformed URL") - } - - @Test - @Throws(Exception::class) - fun open_httpPost_sendsPostRequest() { - val mockWebServer = MockWebServer() - mockWebServer.enqueue(MockResponse()) - - val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() - - val dataSpec = DataSpec.Builder() - .setUri(mockWebServer.url("/test-path").toString()) - .setHttpMethod(DataSpec.HTTP_METHOD_POST) - .setHttpBody("test body".toByteArray(StandardCharsets.UTF_8)) - .build() - + private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + val httpClient = HttpClient() + + @Test + @Throws(Exception::class) + fun open_setsCorrectHeaders() { + val mockWebServer = MockWebServer() + mockWebServer.enqueue(MockResponse()) + + val propertyFromFactory = "fromFactory" + val defaultRequestProperties = HashMap() + defaultRequestProperties["0"] = propertyFromFactory + defaultRequestProperties["1"] = propertyFromFactory + defaultRequestProperties["2"] = propertyFromFactory + defaultRequestProperties["4"] = propertyFromFactory + + val dataSource = + KtorDataSource.Factory(httpClient, coroutineScope) + .setDefaultRequestProperties(defaultRequestProperties) + .createDataSource() + + val propertyFromSetter = "fromSetter" + dataSource.setRequestProperty("1", propertyFromSetter) + dataSource.setRequestProperty("2", propertyFromSetter) + dataSource.setRequestProperty("3", propertyFromSetter) + dataSource.setRequestProperty("5", propertyFromSetter) + + val propertyFromDataSpec = "fromDataSpec" + val dataSpecRequestProperties = HashMap() + dataSpecRequestProperties["2"] = propertyFromDataSpec + dataSpecRequestProperties["3"] = propertyFromDataSpec + dataSpecRequestProperties["4"] = propertyFromDataSpec + dataSpecRequestProperties["6"] = propertyFromDataSpec + + val dataSpec = + DataSpec.Builder() + .setUri(mockWebServer.url("/test-path").toString()) + .setHttpRequestHeaders(dataSpecRequestProperties) + .build() + + dataSource.open(dataSpec) + + val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) + assertThat(request).isNotNull() + val headers = request!!.headers + assertThat(headers["0"]).isEqualTo(propertyFromFactory) + assertThat(headers["1"]).isEqualTo(propertyFromSetter) + assertThat(headers["2"]).isEqualTo(propertyFromDataSpec) + assertThat(headers["3"]).isEqualTo(propertyFromDataSpec) + assertThat(headers["4"]).isEqualTo(propertyFromDataSpec) + assertThat(headers["5"]).isEqualTo(propertyFromSetter) + assertThat(headers["6"]).isEqualTo(propertyFromDataSpec) + } + + @Test + fun open_invalidResponseCode() { + val mockWebServer = MockWebServer() + mockWebServer.enqueue(MockResponse().setResponseCode(404).setBody("failure msg")) + + val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + + val dataSpec = DataSpec.Builder().setUri(mockWebServer.url("/test-path").toString()).build() + + val exception = + assertThrows(HttpDataSource.InvalidResponseCodeException::class.java) { dataSource.open(dataSpec) - - val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) - assertThat(request).isNotNull() - assertThat(request!!.method).isEqualTo("POST") - assertThat(request.body.readUtf8()).isEqualTo("test body") - } + } + + assertThat(exception.responseCode).isEqualTo(404) + assertThat(exception.responseBody).isEqualTo("failure msg".toByteArray(StandardCharsets.UTF_8)) + } + + @Test + @Throws(Exception::class) + fun factory_setRequestPropertyAfterCreation_setsCorrectHeaders() { + val mockWebServer = MockWebServer() + mockWebServer.enqueue(MockResponse()) + val dataSpec = DataSpec.Builder().setUri(mockWebServer.url("/test-path").toString()).build() + + val factory = KtorDataSource.Factory(httpClient, coroutineScope) + val dataSource = factory.createDataSource() + + val defaultRequestProperties = HashMap() + defaultRequestProperties["0"] = "afterCreation" + factory.setDefaultRequestProperties(defaultRequestProperties) + dataSource.open(dataSpec) + + val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) + assertThat(request).isNotNull() + val headers = request!!.headers + assertThat(headers["0"]).isEqualTo("afterCreation") + } + + @Test + fun open_malformedUrl_throwsException() { + val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + + val dataSpec = DataSpec.Builder().setUri("not-a-valid-url").build() + + val exception = + assertThrows(HttpDataSource.HttpDataSourceException::class.java) { dataSource.open(dataSpec) } + + assertThat(exception.message).contains("Malformed URL") + } + + @Test + @Throws(Exception::class) + fun open_httpPost_sendsPostRequest() { + val mockWebServer = MockWebServer() + mockWebServer.enqueue(MockResponse()) + + val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + + val dataSpec = + DataSpec.Builder() + .setUri(mockWebServer.url("/test-path").toString()) + .setHttpMethod(DataSpec.HTTP_METHOD_POST) + .setHttpBody("test body".toByteArray(StandardCharsets.UTF_8)) + .build() + + dataSource.open(dataSpec) + + val request = mockWebServer.takeRequest(10, TimeUnit.SECONDS) + assertThat(request).isNotNull() + assertThat(request!!.method).isEqualTo("POST") + assertThat(request.body.readUtf8()).isEqualTo("test body") + } } From ef560497ed30ee3286331c43e62ab0a5ec946fe9 Mon Sep 17 00:00:00 2001 From: Ian Baker Date: Tue, 24 Feb 2026 15:34:37 +0000 Subject: [PATCH 04/14] Move tests from Robolectric to instrumentation In the past we have found Robolectric is not realistic enough for testing fiddly edge cases in HTTP stacks, so running on an emulator with real Android OS is better. --- libraries/datasource_ktor/build.gradle | 11 ++++++----- .../src/androidTest/AndroidManifest.xml | 14 ++++++++++++++ .../datasource/ktor/KtorDataSourceContractTest.kt | 0 .../media3/datasource/ktor/KtorDataSourceTest.kt | 0 .../datasource_ktor/src/test/AndroidManifest.xml | 4 ---- 5 files changed, 20 insertions(+), 9 deletions(-) create mode 100644 libraries/datasource_ktor/src/androidTest/AndroidManifest.xml rename libraries/datasource_ktor/src/{test => androidTest}/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt (100%) rename libraries/datasource_ktor/src/{test => androidTest}/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt (100%) delete mode 100644 libraries/datasource_ktor/src/test/AndroidManifest.xml diff --git a/libraries/datasource_ktor/build.gradle b/libraries/datasource_ktor/build.gradle index 582de699a39..82692e7b671 100644 --- a/libraries/datasource_ktor/build.gradle +++ b/libraries/datasource_ktor/build.gradle @@ -19,6 +19,7 @@ android { } dependencies { + api 'io.ktor:ktor-client-core:' + ktorVersion api project(modulePrefix + 'lib-common') api project(modulePrefix + 'lib-datasource') implementation 'androidx.annotation:annotation:' + androidxAnnotationVersion @@ -26,11 +27,11 @@ dependencies { compileOnly 'org.checkerframework:checker-qual:' + checkerframeworkVersion compileOnly 'org.jetbrains.kotlin:kotlin-annotations-jvm:' + kotlinAnnotationsVersion implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:' + kotlinxCoroutinesVersion - testImplementation project(modulePrefix + 'test-utils') - testImplementation 'com.squareup.okhttp3:mockwebserver:' + okhttpVersion - testImplementation 'org.robolectric:robolectric:' + robolectricVersion - testImplementation 'io.ktor:ktor-client-okhttp:' + ktorVersion - api 'io.ktor:ktor-client-core:' + ktorVersion + androidTestImplementation project(modulePrefix + 'test-utils') + androidTestImplementation 'androidx.test:runner:' + androidxTestRunnerVersion + androidTestImplementation 'com.linkedin.dexmaker:dexmaker-mockito:' + dexmakerVersion + androidTestImplementation 'com.squareup.okhttp3:mockwebserver:' + okhttpVersion + androidTestImplementation 'io.ktor:ktor-client-okhttp:' + ktorVersion } ext { diff --git a/libraries/datasource_ktor/src/androidTest/AndroidManifest.xml b/libraries/datasource_ktor/src/androidTest/AndroidManifest.xml new file mode 100644 index 00000000000..3cd0981771e --- /dev/null +++ b/libraries/datasource_ktor/src/androidTest/AndroidManifest.xml @@ -0,0 +1,14 @@ + + + + + + + + + + diff --git a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt similarity index 100% rename from libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt rename to libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt diff --git a/libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt similarity index 100% rename from libraries/datasource_ktor/src/test/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt rename to libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt diff --git a/libraries/datasource_ktor/src/test/AndroidManifest.xml b/libraries/datasource_ktor/src/test/AndroidManifest.xml deleted file mode 100644 index 7e4fa50124c..00000000000 --- a/libraries/datasource_ktor/src/test/AndroidManifest.xml +++ /dev/null @@ -1,4 +0,0 @@ - - - - From 8424cc07ce812cdee929ed12438ceedbdb0ca701 Mon Sep 17 00:00:00 2001 From: Ian Baker Date: Tue, 24 Feb 2026 15:38:52 +0000 Subject: [PATCH 05/14] Add copyright headers --- libraries/datasource_ktor/build.gradle | 13 +++++++++++++ .../src/androidTest/AndroidManifest.xml | 14 ++++++++++++++ .../datasource/ktor/KtorDataSourceContractTest.kt | 15 +++++++++++++++ .../media3/datasource/ktor/KtorDataSourceTest.kt | 15 +++++++++++++++ .../datasource_ktor/src/main/AndroidManifest.xml | 14 ++++++++++++++ .../media3/datasource/ktor/KtorDataSource.kt | 15 +++++++++++++++ .../media3/datasource/ktor/package-info.java | 15 +++++++++++++++ 7 files changed, 101 insertions(+) diff --git a/libraries/datasource_ktor/build.gradle b/libraries/datasource_ktor/build.gradle index 82692e7b671..f9bb5a811ba 100644 --- a/libraries/datasource_ktor/build.gradle +++ b/libraries/datasource_ktor/build.gradle @@ -1,3 +1,16 @@ +// Copyright 2026 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. apply from: "$gradle.ext.androidxMediaSettingsDir/common_library_config.gradle" apply plugin: 'kotlin-android' diff --git a/libraries/datasource_ktor/src/androidTest/AndroidManifest.xml b/libraries/datasource_ktor/src/androidTest/AndroidManifest.xml index 3cd0981771e..c2c578aa852 100644 --- a/libraries/datasource_ktor/src/androidTest/AndroidManifest.xml +++ b/libraries/datasource_ktor/src/androidTest/AndroidManifest.xml @@ -1,4 +1,18 @@ + diff --git a/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt index 9af54cebd2d..d9b42c8bb62 100644 --- a/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt +++ b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt @@ -1,3 +1,18 @@ +/* + * Copyright 2026 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package androidx.media3.datasource.ktor import androidx.media3.datasource.DataSource diff --git a/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt index 68c9f908abc..81b474c47df 100644 --- a/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt +++ b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt @@ -1,3 +1,18 @@ +/* + * Copyright 2026 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package androidx.media3.datasource.ktor import androidx.media3.datasource.DataSpec diff --git a/libraries/datasource_ktor/src/main/AndroidManifest.xml b/libraries/datasource_ktor/src/main/AndroidManifest.xml index 499171ca5de..f5f8b47a4b1 100644 --- a/libraries/datasource_ktor/src/main/AndroidManifest.xml +++ b/libraries/datasource_ktor/src/main/AndroidManifest.xml @@ -1,4 +1,18 @@ + diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt index 55bd8d5a8a9..8d1fe2ee57e 100644 --- a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt @@ -1,3 +1,18 @@ +/* + * Copyright 2026 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package androidx.media3.datasource.ktor import android.net.Uri diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/package-info.java b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/package-info.java index 2c8ed6752e3..c60143af1f0 100644 --- a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/package-info.java +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/package-info.java @@ -1,3 +1,18 @@ +/* + * Copyright 2026 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ @NonNullApi package androidx.media3.datasource.ktor; From 4ae3c795cf3549e378d17674eedfc845196661e1 Mon Sep 17 00:00:00 2001 From: Ian Baker Date: Tue, 24 Feb 2026 16:14:29 +0000 Subject: [PATCH 06/14] Remove parts of README that duplicate Ktor docs about customization --- libraries/datasource_ktor/README.md | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/libraries/datasource_ktor/README.md b/libraries/datasource_ktor/README.md index 965f8465bb3..f85da67ea51 100644 --- a/libraries/datasource_ktor/README.md +++ b/libraries/datasource_ktor/README.md @@ -40,23 +40,6 @@ new DefaultDataSourceFactory( /* baseDataSourceFactory= */ new KtorDataSource.Factory(...)); ``` -### Using with OkHttp engine - -```kotlin -val dataSourceFactory = KtorDataSource.Factory(OkHttp.create()) -``` - -### Using with a custom HttpClient - -```kotlin -val httpClient = HttpClient(OkHttp) { - engine { - // Configure OkHttp engine - } -} -val dataSourceFactory = KtorDataSource.Factory(httpClient) -``` - ## Links * [Javadoc][] From 410621fe7567a80bdee592282f5c7ed6c3ee2676 Mon Sep 17 00:00:00 2001 From: kvtodev Date: Wed, 25 Feb 2026 01:01:37 +0800 Subject: [PATCH 07/14] remove ktor specific Proguard rules. they should be left to the user of ktor. --- libraries/datasource_ktor/proguard-rules.txt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libraries/datasource_ktor/proguard-rules.txt b/libraries/datasource_ktor/proguard-rules.txt index feb77bb649d..a905fc418c7 100644 --- a/libraries/datasource_ktor/proguard-rules.txt +++ b/libraries/datasource_ktor/proguard-rules.txt @@ -5,7 +5,3 @@ -dontwarn okio.** -dontwarn javax.annotation.** -dontwarn org.conscrypt.** - -# Keep Ktor client classes --keep class io.ktor.** { *; } --keep class kotlinx.coroutines.** { *; } From 9a2ecbeeb560eb1fc4bd62c71b9317e55861e6b4 Mon Sep 17 00:00:00 2001 From: Ian Baker Date: Tue, 24 Feb 2026 16:16:32 +0000 Subject: [PATCH 08/14] Remove KtorDataSource from the stable API for now We can promote it to stable later --- api.txt | 13 ------------- .../media3/datasource/ktor/KtorDataSource.kt | 15 +-------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/api.txt b/api.txt index 142dd386e4f..d5c9a173af1 100644 --- a/api.txt +++ b/api.txt @@ -1414,19 +1414,6 @@ package androidx.media3.datasource.okhttp { } -package androidx.media3.datasource.ktor { - - public class KtorDataSource implements androidx.media3.datasource.DataSource androidx.media3.datasource.HttpDataSource { - } - - public static final class KtorDataSource.Factory implements androidx.media3.datasource.HttpDataSource.Factory { - ctor public KtorDataSource.Factory(io.ktor.client.HttpClient); - ctor public KtorDataSource.Factory(io.ktor.client.HttpClient, kotlinx.coroutines.CoroutineScope); - method public androidx.media3.datasource.ktor.KtorDataSource.Factory setUserAgent(@Nullable String); - } - -} - package androidx.media3.exoplayer { public final class ExoPlaybackException extends androidx.media3.common.PlaybackException { diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt index 8d1fe2ee57e..ddc64bc39a5 100644 --- a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt @@ -61,6 +61,7 @@ import kotlinx.coroutines.launch * priority) the `dataSpec`, [setRequestProperty] and the default parameters used to construct the * instance. */ +@UnstableApi class KtorDataSource private constructor( private val httpClient: HttpClient, @@ -102,7 +103,6 @@ private constructor( private var contentTypePredicate: Predicate? = null - @UnstableApi override fun setDefaultRequestProperties( defaultRequestProperties: Map ): Factory { @@ -134,7 +134,6 @@ private constructor( * previously set value. * @return This factory. */ - @UnstableApi fun setCacheControl(cacheControl: String?): Factory { this.cacheControl = cacheControl return this @@ -150,7 +149,6 @@ private constructor( * was previously set. * @return This factory. */ - @UnstableApi fun setContentTypePredicate(contentTypePredicate: Predicate?): Factory { this.contentTypePredicate = contentTypePredicate return this @@ -166,13 +164,11 @@ private constructor( * @param transferListener The listener that will be used. * @return This factory. */ - @UnstableApi fun setTransferListener(transferListener: TransferListener?): Factory { this.transferListener = transferListener return this } - @UnstableApi override fun createDataSource(): KtorDataSource { val client = httpClient val dataSource = @@ -202,7 +198,6 @@ private constructor( private var bytesToRead: Long = 0 private var bytesRead: Long = 0 - @UnstableApi override fun getUri(): Uri? { return if (response != null) { Uri.parse(response!!.request.url.toString()) @@ -213,12 +208,10 @@ private constructor( } } - @UnstableApi override fun getResponseCode(): Int { return response?.status?.value ?: -1 } - @UnstableApi override fun getResponseHeaders(): Map> { val httpResponse = response ?: return emptyMap() val headers = TreeMap>(String.CASE_INSENSITIVE_ORDER) @@ -228,25 +221,21 @@ private constructor( return headers } - @UnstableApi override fun setRequestProperty(name: String, value: String) { requireNotNull(name) { "name cannot be null" } requireNotNull(value) { "value cannot be null" } requestProperties.set(name, value) } - @UnstableApi override fun clearRequestProperty(name: String) { requireNotNull(name) { "name cannot be null" } requestProperties.remove(name) } - @UnstableApi override fun clearAllRequestProperties() { requestProperties.clear() } - @UnstableApi @Throws(HttpDataSource.HttpDataSourceException::class) override fun open(dataSpec: DataSpec): Long { this.dataSpec = dataSpec @@ -336,7 +325,6 @@ private constructor( return bytesToRead } - @UnstableApi @Throws(HttpDataSource.HttpDataSourceException::class) override fun read(buffer: ByteArray, offset: Int, length: Int): Int { return try { @@ -350,7 +338,6 @@ private constructor( } } - @UnstableApi override fun close() { if (connectionEstablished) { connectionEstablished = false From b51daadc87065743b44a29fadf8ef49a974fc3a8 Mon Sep 17 00:00:00 2001 From: kvtodev Date: Wed, 25 Feb 2026 01:39:14 +0800 Subject: [PATCH 09/14] remove resolve case-insensitive step io.ktor.http.Headers it self is a case-insensitive map already --- .../media3/datasource/ktor/KtorDataSource.kt | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt index ddc64bc39a5..f089d4f9792 100644 --- a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt @@ -41,18 +41,17 @@ import io.ktor.http.HttpMethod import io.ktor.http.contentLength import io.ktor.http.contentType import io.ktor.utils.io.jvm.javaio.toInputStream -import java.io.IOException -import java.io.InterruptedIOException -import java.util.TreeMap -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference -import kotlin.math.min import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch +import java.io.IOException +import java.io.InterruptedIOException +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference +import kotlin.math.min /** * An [HttpDataSource] that delegates to Ktor's [HttpClient]. @@ -214,11 +213,9 @@ private constructor( override fun getResponseHeaders(): Map> { val httpResponse = response ?: return emptyMap() - val headers = TreeMap>(String.CASE_INSENSITIVE_ORDER) - httpResponse.headers.names().forEach { name -> - headers[name] = httpResponse.headers.getAll(name) ?: emptyList() + return httpResponse.headers.names().associateWith { name -> + httpResponse.headers.getAll(name) ?: emptyList() } - return headers } override fun setRequestProperty(name: String, value: String) { From 31e215540db9677e9630f0f39c0c2cbda0377162 Mon Sep 17 00:00:00 2001 From: kvtodev Date: Wed, 25 Feb 2026 01:41:21 +0800 Subject: [PATCH 10/14] remove Redundant 'requireNotNull' call --- .../java/androidx/media3/datasource/ktor/KtorDataSource.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt index f089d4f9792..513671ea19b 100644 --- a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt @@ -219,13 +219,10 @@ private constructor( } override fun setRequestProperty(name: String, value: String) { - requireNotNull(name) { "name cannot be null" } - requireNotNull(value) { "value cannot be null" } requestProperties.set(name, value) } override fun clearRequestProperty(name: String) { - requireNotNull(name) { "name cannot be null" } requestProperties.remove(name) } From 8653e59633b789897f5800340431c8573e7194b6 Mon Sep 17 00:00:00 2001 From: kvtodev Date: Wed, 25 Feb 2026 11:22:19 +0800 Subject: [PATCH 11/14] Revert "remove resolve case-insensitive step" This reverts commit b51daadc87065743b44a29fadf8ef49a974fc3a8. fix: failed test case , we not only need a case-insensitively stored map, we also need a case-insensitively look up map. --- .../androidx/media3/datasource/ktor/KtorDataSource.kt | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt index 513671ea19b..260852bdaf0 100644 --- a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt @@ -49,6 +49,7 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import java.io.IOException import java.io.InterruptedIOException +import java.util.TreeMap import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference import kotlin.math.min @@ -213,9 +214,12 @@ private constructor( override fun getResponseHeaders(): Map> { val httpResponse = response ?: return emptyMap() - return httpResponse.headers.names().associateWith { name -> - httpResponse.headers.getAll(name) ?: emptyList() + // ordered map use case-insensitive comparator to support case-insensitive lookup + val result = TreeMap>(String.CASE_INSENSITIVE_ORDER) + for (name in httpResponse.headers.names()) { + result[name] = httpResponse.headers.getAll(name) ?: emptyList() } + return result } override fun setRequestProperty(name: String, value: String) { From 919b31df8ab4fc1d99a19c4dc2ffaf4ea0906446 Mon Sep 17 00:00:00 2001 From: kvtodev Date: Wed, 25 Feb 2026 11:37:17 +0800 Subject: [PATCH 12/14] change to use ktor engine -android impl in test. --- libraries/datasource_ktor/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/datasource_ktor/build.gradle b/libraries/datasource_ktor/build.gradle index f9bb5a811ba..0fe90ef28fd 100644 --- a/libraries/datasource_ktor/build.gradle +++ b/libraries/datasource_ktor/build.gradle @@ -44,7 +44,7 @@ dependencies { androidTestImplementation 'androidx.test:runner:' + androidxTestRunnerVersion androidTestImplementation 'com.linkedin.dexmaker:dexmaker-mockito:' + dexmakerVersion androidTestImplementation 'com.squareup.okhttp3:mockwebserver:' + okhttpVersion - androidTestImplementation 'io.ktor:ktor-client-okhttp:' + ktorVersion + androidTestImplementation 'io.ktor:ktor-client-android:' + ktorVersion } ext { From 228d4cc6c34bcbc285d3c957aa3a53f2bebdde06 Mon Sep 17 00:00:00 2001 From: kvtodev Date: Wed, 25 Feb 2026 12:18:30 +0800 Subject: [PATCH 13/14] refactor: instead of providing configurable CoroutineScope, simplify logic via blocking at top level. --- .../ktor/KtorDataSourceContractTest.kt | 7 +- .../datasource/ktor/KtorDataSourceTest.kt | 14 +- .../media3/datasource/ktor/KtorDataSource.kt | 265 ++++++++---------- 3 files changed, 126 insertions(+), 160 deletions(-) diff --git a/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt index d9b42c8bb62..a179bd004e5 100644 --- a/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt +++ b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceContractTest.kt @@ -22,9 +22,6 @@ import androidx.test.ext.junit.runners.AndroidJUnit4 import com.google.common.collect.ImmutableList import io.ktor.client.HttpClient import io.ktor.client.plugins.HttpTimeout -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.SupervisorJob import org.junit.Rule import org.junit.runner.RunWith @@ -41,10 +38,8 @@ class KtorDataSourceContractTest : DataSourceContractTest() { } } - private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) - override fun createDataSource(): DataSource { - return KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + return KtorDataSource.Factory(httpClient).createDataSource() } override fun getTestResources(): ImmutableList { diff --git a/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt index 81b474c47df..8aad11a8417 100644 --- a/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt +++ b/libraries/datasource_ktor/src/androidTest/java/androidx/media3/datasource/ktor/KtorDataSourceTest.kt @@ -23,9 +23,6 @@ import io.ktor.client.HttpClient import java.nio.charset.StandardCharsets import java.util.HashMap import java.util.concurrent.TimeUnit -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.SupervisorJob import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer import org.junit.Assert.assertThrows @@ -35,7 +32,6 @@ import org.junit.runner.RunWith @RunWith(AndroidJUnit4::class) class KtorDataSourceTest { - private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) val httpClient = HttpClient() @Test @@ -52,7 +48,7 @@ class KtorDataSourceTest { defaultRequestProperties["4"] = propertyFromFactory val dataSource = - KtorDataSource.Factory(httpClient, coroutineScope) + KtorDataSource.Factory(httpClient) .setDefaultRequestProperties(defaultRequestProperties) .createDataSource() @@ -94,7 +90,7 @@ class KtorDataSourceTest { val mockWebServer = MockWebServer() mockWebServer.enqueue(MockResponse().setResponseCode(404).setBody("failure msg")) - val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + val dataSource = KtorDataSource.Factory(httpClient).createDataSource() val dataSpec = DataSpec.Builder().setUri(mockWebServer.url("/test-path").toString()).build() @@ -114,7 +110,7 @@ class KtorDataSourceTest { mockWebServer.enqueue(MockResponse()) val dataSpec = DataSpec.Builder().setUri(mockWebServer.url("/test-path").toString()).build() - val factory = KtorDataSource.Factory(httpClient, coroutineScope) + val factory = KtorDataSource.Factory(httpClient) val dataSource = factory.createDataSource() val defaultRequestProperties = HashMap() @@ -130,7 +126,7 @@ class KtorDataSourceTest { @Test fun open_malformedUrl_throwsException() { - val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + val dataSource = KtorDataSource.Factory(httpClient).createDataSource() val dataSpec = DataSpec.Builder().setUri("not-a-valid-url").build() @@ -146,7 +142,7 @@ class KtorDataSourceTest { val mockWebServer = MockWebServer() mockWebServer.enqueue(MockResponse()) - val dataSource = KtorDataSource.Factory(httpClient, coroutineScope).createDataSource() + val dataSource = KtorDataSource.Factory(httpClient).createDataSource() val dataSpec = DataSpec.Builder() diff --git a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt index 260852bdaf0..78676fc7e70 100644 --- a/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt +++ b/libraries/datasource_ktor/src/main/java/androidx/media3/datasource/ktor/KtorDataSource.kt @@ -40,19 +40,15 @@ import io.ktor.client.statement.request import io.ktor.http.HttpMethod import io.ktor.http.contentLength import io.ktor.http.contentType -import io.ktor.utils.io.jvm.javaio.toInputStream -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.launch +import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.availableForRead +import io.ktor.utils.io.readAvailable import java.io.IOException import java.io.InterruptedIOException import java.util.TreeMap -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference import kotlin.math.min +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.runBlocking /** * An [HttpDataSource] that delegates to Ktor's [HttpClient]. @@ -65,7 +61,6 @@ import kotlin.math.min class KtorDataSource private constructor( private val httpClient: HttpClient, - private val coroutineScope: CoroutineScope, private val userAgent: String?, private val cacheControl: String?, private val defaultRequestProperties: HttpDataSource.RequestProperties?, @@ -85,13 +80,8 @@ private constructor( * [androidx.media3.datasource.DataSource.Factory] for [KtorDataSource] instances. * * @param httpClient A [HttpClient] for use by the sources created by the factory. - * @param scope A [CoroutineScope] for running suspend functions. If not provided, a default scope - * with [Dispatchers.IO] and a [SupervisorJob] will be created. */ - class Factory( - private val httpClient: HttpClient, - private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO), - ) : HttpDataSource.Factory { + class Factory(private val httpClient: HttpClient) : HttpDataSource.Factory { private val defaultRequestProperties = HttpDataSource.RequestProperties() @@ -174,7 +164,6 @@ private constructor( val dataSource = KtorDataSource( client, - scope, userAgent, cacheControl, defaultRequestProperties, @@ -190,9 +179,7 @@ private constructor( private var response: HttpResponse? = null - private var responseInputStream: java.io.InputStream? = null - - private var currentJob: Job? = null + private var responseChannel: ByteReadChannel? = null private var connectionEstablished = false private var bytesToRead: Long = 0 @@ -241,10 +228,75 @@ private constructor( bytesToRead = 0 transferInitializing(dataSpec) + val urlString = dataSpec.uri.toString() + val uri = Uri.parse(urlString) + val scheme = uri.scheme + if (scheme == null || !scheme.lowercase().startsWith("http")) { + throw HttpDataSource.HttpDataSourceException( + "Malformed URL", + dataSpec, + PlaybackException.ERROR_CODE_FAILED_RUNTIME_CHECK, + HttpDataSource.HttpDataSourceException.TYPE_OPEN, + ) + } + + val mergedHeaders = HashMap() + defaultRequestProperties?.snapshot?.forEach { (key, value) -> mergedHeaders[key] = value } + requestProperties.snapshot.forEach { (key, value) -> mergedHeaders[key] = value } + dataSpec.httpRequestHeaders.forEach { (key, value) -> mergedHeaders[key] = value } + + val httpResponse: HttpResponse + val channel: ByteReadChannel try { - val httpResponse = executeRequest(dataSpec) + runBlocking { + httpResponse = + httpClient + .prepareRequest { + url(urlString) + + headers { + mergedHeaders.forEach { (key, value) -> append(key, value) } + + val rangeHeader = + HttpUtil.buildRangeRequestHeader(dataSpec.position, dataSpec.length) + if (rangeHeader != null) { + append(HttpHeaders.RANGE, rangeHeader) + } + + if (userAgent != null) { + append(HttpHeaders.USER_AGENT, userAgent) + } + + if (cacheControl != null) { + append(HttpHeaders.CACHE_CONTROL, cacheControl) + } + + if (!dataSpec.isFlagSet(DataSpec.FLAG_ALLOW_GZIP)) { + append(HttpHeaders.ACCEPT_ENCODING, "identity") + } + } + + method = + when (dataSpec.httpMethod) { + DataSpec.HTTP_METHOD_GET -> HttpMethod.Get + DataSpec.HTTP_METHOD_POST -> HttpMethod.Post + DataSpec.HTTP_METHOD_HEAD -> HttpMethod.Head + else -> HttpMethod.Get + } + + if (dataSpec.httpBody != null) { + setBody(dataSpec.httpBody!!) + } else if (dataSpec.httpMethod == DataSpec.HTTP_METHOD_POST) { + setBody(ByteArray(0)) + } + } + .execute() + channel = httpResponse.bodyAsChannel() + } this.response = httpResponse - this.responseInputStream = executeSuspend { httpResponse.bodyAsChannel().toInputStream() } + this.responseChannel = channel + } catch (_: CancellationException) { + throw InterruptedIOException() } catch (e: IOException) { if (e is HttpDataSource.HttpDataSourceException) throw e throw HttpDataSource.HttpDataSourceException.createForIOException( @@ -252,9 +304,16 @@ private constructor( dataSpec, HttpDataSource.HttpDataSourceException.TYPE_OPEN, ) + } catch (e: Exception) { + throw HttpDataSource.HttpDataSourceException( + e.message ?: "Unknown error", + null, + dataSpec, + PlaybackException.ERROR_CODE_IO_UNSPECIFIED, + HttpDataSource.HttpDataSourceException.TYPE_OPEN, + ) } - val httpResponse = this.response!! val responseCode = httpResponse.status.value if (responseCode !in 200..299) { @@ -270,8 +329,13 @@ private constructor( val errorResponseBody: ByteArray = try { - responseInputStream?.readBytes() ?: Util.EMPTY_BYTE_ARRAY - } catch (e: IOException) { + runBlocking { + val ch = responseChannel ?: return@runBlocking Util.EMPTY_BYTE_ARRAY + val buffer = ByteArray(ch.availableForRead.coerceAtMost(1024 * 1024)) + val read = ch.readAvailable(buffer) + if (read > 0) buffer.copyOf(read) else Util.EMPTY_BYTE_ARRAY + } + } catch (_: Exception) { Util.EMPTY_BYTE_ARRAY } @@ -314,10 +378,13 @@ private constructor( transferStarted(dataSpec) try { - skipFully(bytesToSkip, dataSpec) + runBlocking { skipFully(bytesToSkip, dataSpec) } } catch (e: HttpDataSource.HttpDataSourceException) { closeConnectionQuietly() throw e + } catch (_: CancellationException) { + closeConnectionQuietly() + throw InterruptedIOException() } return bytesToRead @@ -326,13 +393,28 @@ private constructor( @Throws(HttpDataSource.HttpDataSourceException::class) override fun read(buffer: ByteArray, offset: Int, length: Int): Int { return try { - readInternal(buffer, offset, length) + runBlocking { readInternal(buffer, offset, length) } + } catch (_: CancellationException) { + throw HttpDataSource.HttpDataSourceException( + InterruptedIOException(), + dataSpec!!, + PlaybackException.ERROR_CODE_IO_UNSPECIFIED, + HttpDataSource.HttpDataSourceException.TYPE_READ, + ) } catch (e: IOException) { throw HttpDataSource.HttpDataSourceException.createForIOException( e, dataSpec!!, HttpDataSource.HttpDataSourceException.TYPE_READ, ) + } catch (e: Exception) { + throw HttpDataSource.HttpDataSourceException( + e.message ?: "Unknown error", + null, + dataSpec!!, + PlaybackException.ERROR_CODE_IO_UNSPECIFIED, + HttpDataSource.HttpDataSourceException.TYPE_READ, + ) } } @@ -346,124 +428,18 @@ private constructor( dataSpec = null } - @Throws(IOException::class) - private fun executeRequest(dataSpec: DataSpec): HttpResponse { - val urlString = dataSpec.uri.toString() - - val uri = Uri.parse(urlString) - val scheme = uri.scheme - if (scheme == null || !scheme.lowercase().startsWith("http")) { - throw HttpDataSource.HttpDataSourceException( - "Malformed URL", - dataSpec, - PlaybackException.ERROR_CODE_FAILED_RUNTIME_CHECK, - HttpDataSource.HttpDataSourceException.TYPE_OPEN, - ) - } - - val mergedHeaders = HashMap() - defaultRequestProperties?.snapshot?.forEach { (key, value) -> mergedHeaders[key] = value } - requestProperties.snapshot.forEach { (key, value) -> mergedHeaders[key] = value } - dataSpec.httpRequestHeaders.forEach { (key, value) -> mergedHeaders[key] = value } - - return executeSuspend { - httpClient - .prepareRequest { - url(urlString) - - headers { - mergedHeaders.forEach { (key, value) -> append(key, value) } - - val rangeHeader = HttpUtil.buildRangeRequestHeader(dataSpec.position, dataSpec.length) - if (rangeHeader != null) { - append(HttpHeaders.RANGE, rangeHeader) - } - - if (userAgent != null) { - append(HttpHeaders.USER_AGENT, userAgent) - } - - if (cacheControl != null) { - append(HttpHeaders.CACHE_CONTROL, cacheControl) - } - - if (!dataSpec.isFlagSet(DataSpec.FLAG_ALLOW_GZIP)) { - append(HttpHeaders.ACCEPT_ENCODING, "identity") - } - } - - method = - when (dataSpec.httpMethod) { - DataSpec.HTTP_METHOD_GET -> HttpMethod.Get - DataSpec.HTTP_METHOD_POST -> HttpMethod.Post - DataSpec.HTTP_METHOD_HEAD -> HttpMethod.Head - else -> HttpMethod.Get - } - - if (dataSpec.httpBody != null) { - setBody(dataSpec.httpBody!!) - } else if (dataSpec.httpMethod == DataSpec.HTTP_METHOD_POST) { - setBody(ByteArray(0)) - } - } - .execute() - } - } - - @Throws(IOException::class) - private fun executeSuspend(block: suspend () -> T): T { - val exceptionRef = AtomicReference(null) - val resultRef = AtomicReference(null) - val latch = CountDownLatch(1) - - currentJob = - coroutineScope.launch { - try { - resultRef.set(block()) - } catch (e: CancellationException) { - exceptionRef.set(InterruptedIOException()) - } catch (e: Exception) { - exceptionRef.set(e) - } finally { - latch.countDown() - } - } - - try { - latch.await() - } catch (e: InterruptedException) { - currentJob?.cancel() - throw InterruptedIOException() - } - - exceptionRef.get()?.let { throwable -> - when (throwable) { - is IOException -> throw throwable - is InterruptedIOException -> throw throwable - else -> throw IOException(throwable) - } - } - - @Suppress("UNCHECKED_CAST") - return resultRef.get() as T - } - @Throws(HttpDataSource.HttpDataSourceException::class) - private fun skipFully(bytesToSkip: Long, dataSpec: DataSpec) { + private suspend fun skipFully(bytesToSkip: Long, dataSpec: DataSpec) { if (bytesToSkip == 0L) return val skipBuffer = ByteArray(4096) var remaining = bytesToSkip try { - val inputStream = responseInputStream ?: throw IOException("Stream closed") + val channel = responseChannel ?: throw IOException("Channel closed") while (remaining > 0) { val readLength = min(remaining.toInt(), skipBuffer.size) - val read = inputStream.read(skipBuffer, 0, readLength) - - if (Thread.currentThread().isInterrupted) { - throw InterruptedIOException() - } + val read = channel.readAvailable(skipBuffer, 0, readLength) if (read < 0) { throw HttpDataSource.HttpDataSourceException( @@ -487,7 +463,7 @@ private constructor( } @Throws(IOException::class) - private fun readInternal(buffer: ByteArray, offset: Int, readLength: Int): Int { + private suspend fun readInternal(buffer: ByteArray, offset: Int, readLength: Int): Int { if (readLength == 0) return 0 if (bytesToRead != C.LENGTH_UNSET.toLong()) { @@ -495,16 +471,16 @@ private constructor( if (bytesRemaining == 0L) return C.RESULT_END_OF_INPUT val actualReadLength = min(readLength.toLong(), bytesRemaining).toInt() - return readFromStream(buffer, offset, actualReadLength) + return readFromChannel(buffer, offset, actualReadLength) } - return readFromStream(buffer, offset, readLength) + return readFromChannel(buffer, offset, readLength) } @Throws(IOException::class) - private fun readFromStream(buffer: ByteArray, offset: Int, readLength: Int): Int { - val inputStream = responseInputStream ?: return C.RESULT_END_OF_INPUT - val read = inputStream.read(buffer, offset, readLength) + private suspend fun readFromChannel(buffer: ByteArray, offset: Int, readLength: Int): Int { + val channel = responseChannel ?: return C.RESULT_END_OF_INPUT + val read = channel.readAvailable(buffer, offset, readLength) if (read < 0) return C.RESULT_END_OF_INPUT @@ -514,8 +490,7 @@ private constructor( } private fun closeConnectionQuietly() { - responseInputStream?.close() - responseInputStream = null - currentJob = null + responseChannel?.cancel(null) + responseChannel = null } } From 8cddcb47dbf5aab99626510c738b40c14232cd43 Mon Sep 17 00:00:00 2001 From: kvtodev Date: Wed, 25 Feb 2026 12:31:42 +0800 Subject: [PATCH 14/14] dep: add dependency 'io.ktor:ktor-client-android:' as default choice --- libraries/datasource_ktor/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/datasource_ktor/build.gradle b/libraries/datasource_ktor/build.gradle index 0fe90ef28fd..7f061fb52da 100644 --- a/libraries/datasource_ktor/build.gradle +++ b/libraries/datasource_ktor/build.gradle @@ -33,6 +33,7 @@ android { dependencies { api 'io.ktor:ktor-client-core:' + ktorVersion + api 'io.ktor:ktor-client-android:' + ktorVersion api project(modulePrefix + 'lib-common') api project(modulePrefix + 'lib-datasource') implementation 'androidx.annotation:annotation:' + androidxAnnotationVersion @@ -44,7 +45,6 @@ dependencies { androidTestImplementation 'androidx.test:runner:' + androidxTestRunnerVersion androidTestImplementation 'com.linkedin.dexmaker:dexmaker-mockito:' + dexmakerVersion androidTestImplementation 'com.squareup.okhttp3:mockwebserver:' + okhttpVersion - androidTestImplementation 'io.ktor:ktor-client-android:' + ktorVersion } ext {