Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .github/actions/migration_tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ runs:
MIN_AIRFLOW_VERSION="$(
python ./scripts/ci/testing/get_min_airflow_version_for_python.py "${PYTHON_VERSION}"
)"
AIRFLOW_EXTRAS=""
if [[ "${MIN_AIRFLOW_VERSION}" =~ ^2\. ]]; then
AIRFLOW_EXTRAS="--airflow-extras pydantic"
fi
breeze shell "${AIRFLOW_2_CMD}" \
--use-airflow-version "${MIN_AIRFLOW_VERSION}" \
--airflow-extras pydantic \
${AIRFLOW_EXTRAS} \
--answer y &&
breeze shell "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${DB_MANGERS}
${AIRFLOW_3_CMD}" --no-db-cleanup
Expand Down Expand Up @@ -61,9 +65,13 @@ runs:
MIN_AIRFLOW_VERSION="$(
python ./scripts/ci/testing/get_min_airflow_version_for_python.py "${PYTHON_VERSION}"
)"
AIRFLOW_EXTRAS=""
if [[ "${MIN_AIRFLOW_VERSION}" =~ ^2\. ]]; then
AIRFLOW_EXTRAS="--airflow-extras pydantic"
fi
breeze shell "${AIRFLOW_2_CMD}" \
--use-airflow-version "${MIN_AIRFLOW_VERSION}" \
--airflow-extras pydantic \
${AIRFLOW_EXTRAS} \
--answer y &&
breeze shell "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${DB_MANGERS}
${AIRFLOW_3_CMD}" --no-db-cleanup
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ jobs:
uses: ./.github/actions/migration_tests
with:
python-version: ${{ matrix.python-version }}
if: inputs.run-migration-tests == 'true' && inputs.test-group == 'core'
# Any new python version should be disabled below via `&& matrix.python-version != '3.xx'` until the first
# Airflow version that supports it is released - otherwise there's nothing to migrate back to.
if: inputs.run-migration-tests == 'true' && inputs.test-group == 'core' && matrix.python-version != '3.14'
- name: >
${{ inputs.test-group }}:${{ inputs.test-scope }} Tests ${{ inputs.test-name }} ${{ matrix.backend-version }}
Py${{ matrix.python-version }}:${{ env.PARALLEL_TEST_TYPES }}
Expand Down
12 changes: 12 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ repos:
exclude: >
(?x)
^README\.md$|
^pyproject\.toml$|
^generated/PYPI_README\.md$|
^airflow-core/docs/.*commits\.rst$|
^airflow-core/newsfragments/41368\.significant\.rst$|
Expand Down Expand Up @@ -816,6 +817,17 @@ repos:
^providers/.*/provider\.yaml$
pass_filenames: false
require_serial: true
- id: check-excluded-provider-markers
name: Check excluded-provider python_version markers in pyproject.toml
language: python
entry: ./scripts/ci/prek/check_excluded_provider_markers.py
files: >
(?x)
^pyproject\.toml$|
^providers/.*/provider\.yaml$
pass_filenames: false
require_serial: true
additional_dependencies: ['packaging>=25', 'pyyaml', 'tomli>=2.0.1', 'rich>=13.6.0']
- id: update-reproducible-source-date-epoch
name: Update Source Date Epoch for reproducible builds
language: python
Expand Down
10 changes: 9 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,15 @@ function common::get_constraints_location() {
echo
echo "${COLOR_BLUE}Downloading constraints from ${AIRFLOW_CONSTRAINTS_LOCATION} to ${HOME}/constraints.txt ${COLOR_RESET}"
echo
curl -sSf -o "${HOME}/constraints.txt" "${AIRFLOW_CONSTRAINTS_LOCATION}"
if ! curl -sSf -o "${HOME}/constraints.txt" "${AIRFLOW_CONSTRAINTS_LOCATION}"; then
echo
echo "${COLOR_YELLOW}Constraints file not found at ${AIRFLOW_CONSTRAINTS_LOCATION} (new Python version being bootstrapped?).${COLOR_RESET}"
echo "${COLOR_YELLOW}Falling back to no-constraints installation.${COLOR_RESET}"
echo
AIRFLOW_CONSTRAINTS_LOCATION=""
# Create an empty constraints file so --constraint flag still works
touch "${HOME}/constraints.txt"
fi
else
echo
echo "${COLOR_BLUE}Copying constraints from ${AIRFLOW_CONSTRAINTS_LOCATION} to ${HOME}/constraints.txt ${COLOR_RESET}"
Expand Down
11 changes: 10 additions & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,15 @@ function common::get_constraints_location() {
echo
echo "${COLOR_BLUE}Downloading constraints from ${AIRFLOW_CONSTRAINTS_LOCATION} to ${HOME}/constraints.txt ${COLOR_RESET}"
echo
curl -sSf -o "${HOME}/constraints.txt" "${AIRFLOW_CONSTRAINTS_LOCATION}"
if ! curl -sSf -o "${HOME}/constraints.txt" "${AIRFLOW_CONSTRAINTS_LOCATION}"; then
echo
echo "${COLOR_YELLOW}Constraints file not found at ${AIRFLOW_CONSTRAINTS_LOCATION} (new Python version being bootstrapped?).${COLOR_RESET}"
echo "${COLOR_YELLOW}Falling back to no-constraints installation.${COLOR_RESET}"
echo
AIRFLOW_CONSTRAINTS_LOCATION=""
# Create an empty constraints file so --constraint flag still works
touch "${HOME}/constraints.txt"
fi
else
echo
echo "${COLOR_BLUE}Copying constraints from ${AIRFLOW_CONSTRAINTS_LOCATION} to ${HOME}/constraints.txt ${COLOR_RESET}"
Expand Down Expand Up @@ -1729,6 +1737,7 @@ ENV AIRFLOW_REPO=${AIRFLOW_REPO}\
AIRFLOW_VERSION_SPECIFICATION="" \
PIP_PROGRESS_BAR=${PIP_PROGRESS_BAR} \
ADDITIONAL_PIP_INSTALL_FLAGS=${ADDITIONAL_PIP_INSTALL_FLAGS} \
INCLUDE_PRE_RELEASE="true" \
CASS_DRIVER_BUILD_CONCURRENCY=${CASS_DRIVER_BUILD_CONCURRENCY} \
CASS_DRIVER_NO_CYTHON=${CASS_DRIVER_NO_CYTHON}

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/installation/prerequisites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Prerequisites

Airflow® is tested with:

* Python: 3.10, 3.11, 3.12, 3.13
* Python: 3.10, 3.11, 3.12, 3.13, 3.14

* Databases:

Expand Down
24 changes: 13 additions & 11 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ description = "Core packages for Apache Airflow, schedule and API server"
readme = { file = "README.md", content-type = "text/markdown" }
license = "Apache-2.0"
license-files = ["LICENSE", "NOTICE"]
# We know that it will take a while before we can support Python 3.14 because of all our dependencies
# It takes about 4-7 months after Python release before we can support it, so we limit it to <3.14
# proactively. This way we also have a chance to test it with Python 3.14 and bump the upper binding
# and manually mark providers that do not support it yet with !-3.14 - until they support it - which will
# also exclude resolving uv workspace dependencies for those providers.
requires-python = ">=3.10,!=3.14"
# Supporting new Python releases typically takes 4-7 months due to all our dependencies.
# We proactively exclude the next major version to avoid dependency conflicts, then test it and
# bump the upper binding once ready. Providers that don't support it yet are marked with
# != constraint - until they support it - which also excludes resolving uv workspace dependencies.
requires-python = ">=3.10,!=3.15"
authors = [
{ name = "Apache Software Foundation", email = "dev@airflow.apache.org" },
]
Expand All @@ -60,6 +59,7 @@ classifiers = [
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Topic :: System :: Monitoring",
]

Expand All @@ -80,7 +80,8 @@ dependencies = [
# The 1.13.0 of alembic marked some migration code as SQLAlchemy 2+ only so we limit it to 1.13.1
"alembic>=1.13.1, <2.0",
"argcomplete>=1.10",
"asgiref>=2.3.0",
"asgiref>=2.3.0; python_version < '3.14'",
"asgiref>=3.11.1; python_version >= '3.14'",
"attrs>=22.1.0, !=25.2.0",
"cadwyn>=6.0.4",
"colorlog>=6.8.2",
Expand All @@ -103,7 +104,8 @@ dependencies = [
"jinja2>=3.1.5",
"jsonschema>=4.19.1",
"lazy-object-proxy>=1.2.0",
'libcst >=1.8.2',
'libcst >=1.8.2; python_version < "3.14"',
'libcst >=1.8.6; python_version >= "3.14"',
"linkify-it-py>=2.0.0",
"lockfile>=0.12.2",
"methodtools>=0.4.7",
Expand Down Expand Up @@ -138,8 +140,7 @@ dependencies = [
"rich-argparse>=1.0.0",
"rich>=13.6.0",
"setproctitle>=1.3.3",
# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__)
"sqlalchemy[asyncio]>=2.0.36",
"sqlalchemy[asyncio]>=2.0.48",
"svcs>=25.1.0",
"tabulate>=0.9.0",
"tenacity>=8.3.0",
Expand Down Expand Up @@ -173,7 +174,8 @@ dependencies = [
"async" = [
"eventlet>=0.37.0",
"gevent>=25.4.1",
"greenlet>=3.1.0",
"greenlet>=3.1.0; python_version < '3.14'",
"greenlet>=3.3.2; python_version >= '3.14'",
"greenback>=1.2.1",
]
"graphviz" = [
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/typing_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@
if sys.version_info >= (3, 11):
from typing import Self, Unpack, assert_never
else:
# TODO: Remove once Python 3.10 support is dropped (EOL 2026)
from typing_extensions import Self, Unpack, assert_never
3 changes: 3 additions & 0 deletions airflow-core/tests/unit/always/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
IGNORE_EXAMPLE_DAGS: tuple[str, ...] = (
# These example dags require suspended providers, eg: google dataflow dependent on the Apache Beam provider,
# but it's in the suspended list, we can't import the dag
# Ray uses pydantic v1 internally, which fails to infer types in Python 3.14.
# TODO: remove once ray releases a version with Python 3.14 support.
"providers/google/tests/system/google/cloud/ray/example_ray_job.py",
"providers/google/tests/system/google/cloud/dataflow/example_dataflow_go.py",
"providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py",
"providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import annotations

import contextlib
import os

import pytest
Expand All @@ -33,7 +34,7 @@
@pytest.fixture
def auth_manager():
auth_manager = SimpleAuthManager()
if os.path.exists(auth_manager.get_generated_password_file()):
with contextlib.suppress(FileNotFoundError):
os.remove(auth_manager.get_generated_password_file())
return auth_manager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def lifespan_app(scope, receive, send):
import asyncio

with structlog.testing.capture_logs() as logs:
asyncio.get_event_loop().run_until_complete(middleware({"type": "lifespan"}, None, None))
asyncio.run(middleware({"type": "lifespan"}, None, None))

assert logs == []

Expand Down
65 changes: 54 additions & 11 deletions airflow-core/tests/unit/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@

pytestmark = pytest.mark.db_test

# Runtime is fine, we just can't run the tests on macOS
skip_spawn_mp_start = pytest.mark.skipif(
multiprocessing.get_context().get_start_method() == "spawn",
reason="mock patching in test don't work with 'spawn' mode (default on macOS)",
# Mock patching doesn't work across process boundaries with 'spawn' (default on macOS)
# or 'forkserver' (default on Linux with Python 3.14+).
skip_non_fork_mp_start = pytest.mark.skipif(
multiprocessing.get_start_method() != "fork",
reason="mock patching in test doesn't work with non-fork multiprocessing start methods",
)

skip_fork_mp_start = pytest.mark.skipif(
multiprocessing.get_start_method() == "fork",
reason="tests non-fork (lazy-spawning) behavior",
)


Expand All @@ -68,7 +74,7 @@ def test_supports_multi_team(self):
def test_serve_logs_default_value(self):
assert LocalExecutor.serve_logs

@skip_spawn_mp_start
@skip_non_fork_mp_start
@mock.patch.object(gc, "unfreeze")
@mock.patch.object(gc, "freeze")
def test_executor_worker_spawned(self, mock_freeze, mock_unfreeze):
Expand All @@ -82,6 +88,33 @@ def test_executor_worker_spawned(self, mock_freeze, mock_unfreeze):

executor.end()

@skip_fork_mp_start
@mock.patch.object(gc, "unfreeze")
@mock.patch.object(gc, "freeze")
def test_executor_lazy_worker_spawning(self, mock_freeze, mock_unfreeze):
"""On non-fork start methods, workers are spawned lazily and gc.freeze is not called."""
executor = LocalExecutor(parallelism=3)
executor.start()

try:
# No workers should be pre-spawned
assert len(executor.workers) == 0
mock_freeze.assert_not_called()
mock_unfreeze.assert_not_called()

# Simulate a queued message so _check_workers spawns one worker on demand
with executor._unread_messages:
executor._unread_messages.value = 1
executor.activity_queue.put(None) # poison pill so the worker exits cleanly
executor._check_workers()

assert len(executor.workers) == 1
# gc.freeze is still not used for non-fork
mock_freeze.assert_not_called()
finally:
executor.end()

@skip_non_fork_mp_start
@mock.patch("airflow.sdk.execution_time.supervisor.supervise")
def test_execution(self, mock_supervise):
success_tis = [
Expand Down Expand Up @@ -182,7 +215,7 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock
mock_stats_gauge.assert_has_calls(calls)

@skip_if_force_lowest_dependencies_marker
@pytest.mark.execution_timeout(5)
@pytest.mark.execution_timeout(30)
def test_clean_stop_on_signal(self):
import signal

Expand Down Expand Up @@ -303,8 +336,15 @@ def test_multiple_team_executors_isolation(self):

# Verify each executor has its own workers dict
assert team_a_executor.workers is not team_b_executor.workers
assert len(team_a_executor.workers) == 2
assert len(team_b_executor.workers) == 3

if LocalExecutor.is_mp_using_fork:
# fork pre-spawns all workers at start()
assert len(team_a_executor.workers) == 2
assert len(team_b_executor.workers) == 3
else:
# forkserver/spawn use lazy spawning
assert len(team_a_executor.workers) == 0
assert len(team_b_executor.workers) == 0

# Verify each executor has its own unread_messages counter
assert team_a_executor._unread_messages is not team_b_executor._unread_messages
Expand All @@ -327,8 +367,11 @@ def test_global_executor_without_team_name(self):

executor.start()

# Verify workers were created
assert len(executor.workers) == 2
if LocalExecutor.is_mp_using_fork:
assert len(executor.workers) == 2
else:
# forkserver/spawn use lazy spawning
assert len(executor.workers) == 0

executor.end()

Expand All @@ -338,7 +381,7 @@ def test_supports_callbacks_flag_is_true(self):
executor = LocalExecutor()
assert executor.supports_callbacks is True

@skip_spawn_mp_start
@skip_non_fork_mp_start
@mock.patch("airflow.executors.workloads.callback.execute_callback_workload")
def test_process_callback_workload(self, mock_execute_callback):
mock_execute_callback.return_value = (True, None)
Expand Down
Loading
Loading