From 60c58960c45ecff851ba266a77be238f35819bf3 Mon Sep 17 00:00:00 2001 From: Santiago Mola Date: Mon, 20 Apr 2026 13:39:17 +0200 Subject: [PATCH 1/3] [python] implement /flush endpoint in python weblogs Add GET /flush to flask, fastapi, django, and tornado weblogs. The endpoint calls tracer.flush() and telemetry_writer.periodic(force_flush=True) to force all pending data to be sent to the agent. Enable python in the flush() call in containers.py, and set library_interface_timeout to 0 for python (alongside nodejs and ruby) now that explicit flushing is supported. --- utils/_context/_scenarios/endtoend.py | 4 +--- utils/_context/containers.py | 2 +- utils/build/docker/python/django/app/urls.py | 11 +++++++++++ utils/build/docker/python/fastapi/main.py | 11 +++++++++++ utils/build/docker/python/flask/app.py | 11 +++++++++++ utils/build/docker/python/tornado/main.py | 13 +++++++++++++ 6 files changed, 48 insertions(+), 4 deletions(-) diff --git a/utils/_context/_scenarios/endtoend.py b/utils/_context/_scenarios/endtoend.py index 3b276ebf344..a8748a688b0 100644 --- a/utils/_context/_scenarios/endtoend.py +++ b/utils/_context/_scenarios/endtoend.py @@ -314,13 +314,11 @@ def configure(self, config: pytest.Config): self.library_interface_timeout = 25 elif library in ("golang",): self.library_interface_timeout = 10 - elif library in ("nodejs", "ruby"): + elif library in ("nodejs", "python", "ruby"): self.library_interface_timeout = 0 elif library in ("php",): # possibly something weird on obfuscator, let increase the delay for now self.library_interface_timeout = 10 - elif library in ("python",): - self.library_interface_timeout = 5 else: self.library_interface_timeout = 40 else: diff --git a/utils/_context/containers.py b/utils/_context/containers.py index 1b68ad32924..0f9351f8edf 100644 --- a/utils/_context/containers.py +++ b/utils/_context/containers.py @@ -1089,9 +1089,9 @@ def warmup_request(self, timeout: int = 10): def flush(self) -> None: if self.library.name not in ( "nodejs", + "python", "ruby", ): - # only nodejs and ruby supports it return try: diff --git a/utils/build/docker/python/django/app/urls.py b/utils/build/docker/python/django/app/urls.py index 76183785aff..babd0c510da 100644 --- a/utils/build/docker/python/django/app/urls.py +++ b/utils/build/docker/python/django/app/urls.py @@ -34,6 +34,7 @@ import ddtrace from ddtrace.appsec import trace_utils as ato_user_sdk_v1 +from ddtrace.internal import telemetry try: from ddtrace.appsec import track_user_sdk @@ -1192,6 +1193,15 @@ def stripe_webhook(request): return JsonResponse({"error": str(e)}, status=403) +def flush(request): + # NOTE: If anything needs to be flushed here before the test suite ends, + # this is the place to do it. + # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md + tracer.flush() + telemetry.telemetry_writer.periodic(force_flush=True) + return HttpResponse("OK") + + urlpatterns = [ path("", hello_world), path("api_security/sampling/", api_security_sampling_status), @@ -1294,4 +1304,5 @@ def stripe_webhook(request): path("stripe/create_checkout_session", stripe_create_checkout_session), path("stripe/create_payment_intent", stripe_create_payment_intent), path("stripe/webhook", stripe_webhook), + path("flush", flush), ] diff --git a/utils/build/docker/python/fastapi/main.py b/utils/build/docker/python/fastapi/main.py index 4cef9727bc4..b6834773bf2 100644 --- a/utils/build/docker/python/fastapi/main.py +++ b/utils/build/docker/python/fastapi/main.py @@ -37,6 +37,7 @@ import ddtrace from ddtrace.appsec import trace_utils as appsec_trace_utils +from ddtrace.internal import telemetry from openfeature import api from ddtrace.openfeature import DataDogProvider from openfeature.evaluation_context import EvaluationContext @@ -1400,3 +1401,13 @@ async def stripe_webhook(request: Request): return JSONResponse(event.data.object) except Exception as e: return JSONResponse({"error": str(e)}, status_code=403) + + +@app.get("/flush", response_class=PlainTextResponse) +def flush(): + # NOTE: If anything needs to be flushed here before the test suite ends, + # this is the place to do it. + # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md + tracer.flush() + telemetry.telemetry_writer.periodic(force_flush=True) + return "OK" diff --git a/utils/build/docker/python/flask/app.py b/utils/build/docker/python/flask/app.py index 7744830b9d0..ec54e01d7b0 100644 --- a/utils/build/docker/python/flask/app.py +++ b/utils/build/docker/python/flask/app.py @@ -97,6 +97,7 @@ config._logs_injection = True from ddtrace.appsec import trace_utils as appsec_trace_utils +from ddtrace.internal import telemetry from ddtrace.internal.datastreams import data_streams_processor from ddtrace.internal.datastreams.processor import DsmPathwayCodec from ddtrace.data_streams import set_consume_checkpoint @@ -2234,3 +2235,13 @@ def stripe_webhook(): return jsonify(event.data.object) except Exception as e: return jsonify({"error": str(e)}), 403 + + +@app.route("/flush") +def flush(): + # NOTE: If anything needs to be flushed here before the test suite ends, + # this is the place to do it. + # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md + tracer.flush() + telemetry.telemetry_writer.periodic(force_flush=True) + return Response("OK") diff --git a/utils/build/docker/python/tornado/main.py b/utils/build/docker/python/tornado/main.py index 6f41ed4a846..2a3991a1e8a 100644 --- a/utils/build/docker/python/tornado/main.py +++ b/utils/build/docker/python/tornado/main.py @@ -21,6 +21,7 @@ from ddtrace.appsec import trace_utils as appsec_trace_utils from ddtrace.appsec import track_user_sdk from ddtrace.contrib.trace_utils import set_user +from ddtrace.internal import telemetry from ddtrace.openfeature import DataDogProvider from ddtrace.trace import tracer from iast import ( @@ -867,6 +868,16 @@ async def post(self) -> None: self.write(json.dumps({"error": str(e)})) +class FlushHandler(BaseHandler): + def get(self) -> None: + # NOTE: If anything needs to be flushed here before the test suite ends, + # this is the place to do it. + # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md + tracer.flush() + telemetry.telemetry_writer.periodic(force_flush=True) + self.write("OK") + + class ExternalRequestHandler(BaseHandler): SUPPORTED_METHODS = ("GET", "POST", "PUT", "TRACE") @@ -1115,6 +1126,8 @@ def make_app() -> Application: (r"/stripe/create_checkout_session", StripeCreateCheckoutSessionHandler), (r"/stripe/create_payment_intent", StripeCreatePaymentIntentHandler), (r"/stripe/webhook", StripeWebhookHandler), + # Flush endpoint + (r"/flush", FlushHandler), ], debug=False, cookie_secret="just_for_tests", # noqa: S106 From dce738309dd34315a95aeeec14f116613ae5aeca Mon Sep 17 00:00:00 2001 From: Santiago Mola Date: Mon, 20 Apr 2026 14:53:05 +0200 Subject: [PATCH 2/3] fix: add sleep(0.2) to /flush to allow background tasks to complete Same approach as Ruby's flush implementation. Without the sleep, the library_interface_timeout drops to 0 and RC polling sequences don't have enough time to complete before the container is stopped. --- utils/build/docker/python/django/app/urls.py | 4 +++- utils/build/docker/python/fastapi/main.py | 2 ++ utils/build/docker/python/flask/app.py | 2 ++ utils/build/docker/python/tornado/main.py | 2 ++ 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/utils/build/docker/python/django/app/urls.py b/utils/build/docker/python/django/app/urls.py index babd0c510da..765bee19eda 100644 --- a/utils/build/docker/python/django/app/urls.py +++ b/utils/build/docker/python/django/app/urls.py @@ -5,8 +5,9 @@ import random import shlex import subprocess -import xmltodict import sys +import time +import xmltodict import boto3 import django import httpx @@ -1199,6 +1200,7 @@ def flush(request): # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md tracer.flush() telemetry.telemetry_writer.periodic(force_flush=True) + time.sleep(0.2) return HttpResponse("OK") diff --git a/utils/build/docker/python/fastapi/main.py b/utils/build/docker/python/fastapi/main.py index b6834773bf2..c045bc7932b 100644 --- a/utils/build/docker/python/fastapi/main.py +++ b/utils/build/docker/python/fastapi/main.py @@ -6,6 +6,7 @@ import shlex import subprocess import sys +import time import typing import fastapi @@ -1410,4 +1411,5 @@ def flush(): # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md tracer.flush() telemetry.telemetry_writer.periodic(force_flush=True) + time.sleep(0.2) return "OK" diff --git a/utils/build/docker/python/flask/app.py b/utils/build/docker/python/flask/app.py index ec54e01d7b0..8d1552b2303 100644 --- a/utils/build/docker/python/flask/app.py +++ b/utils/build/docker/python/flask/app.py @@ -19,6 +19,7 @@ import subprocess import sys import threading +import time import urllib.request import boto3 @@ -2244,4 +2245,5 @@ def flush(): # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md tracer.flush() telemetry.telemetry_writer.periodic(force_flush=True) + time.sleep(0.2) return Response("OK") diff --git a/utils/build/docker/python/tornado/main.py b/utils/build/docker/python/tornado/main.py index 2a3991a1e8a..3efa5be515d 100644 --- a/utils/build/docker/python/tornado/main.py +++ b/utils/build/docker/python/tornado/main.py @@ -7,6 +7,7 @@ import sqlite3 import subprocess import sys +import time from http import HTTPStatus from typing import Any, ClassVar from urllib.parse import parse_qs @@ -875,6 +876,7 @@ def get(self) -> None: # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md tracer.flush() telemetry.telemetry_writer.periodic(force_flush=True) + time.sleep(0.2) self.write("OK") From 4bc8a1278b32b247fb6f0065d48386ae4bd54561 Mon Sep 17 00:00:00 2001 From: Santiago Mola Date: Tue, 21 Apr 2026 11:21:58 +0200 Subject: [PATCH 3/3] python weblogs: use telemetry_writer.app_shutdown() in /flush endpoint app_shutdown() calls periodic(force_flush=True, shutting_down=True) which sends the app-closing event the agent needs to finalise the telemetry batch. Without it, queued metrics (e.g. IAST executed/instrumented.source) were never forwarded by the agent before system-tests read them. Also removes the now-unnecessary time.sleep(0.2) since app_shutdown() is fully synchronous, and the unused `import time` in all four weblogs. /flush is called exactly once at the end of the test suite, so calling disable() (which app_shutdown does internally) is safe. --- utils/build/docker/python/django/app/urls.py | 7 ++++--- utils/build/docker/python/fastapi/main.py | 7 ++++--- utils/build/docker/python/flask/app.py | 7 ++++--- utils/build/docker/python/tornado/main.py | 7 ++++--- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/utils/build/docker/python/django/app/urls.py b/utils/build/docker/python/django/app/urls.py index 765bee19eda..e0563a29c69 100644 --- a/utils/build/docker/python/django/app/urls.py +++ b/utils/build/docker/python/django/app/urls.py @@ -6,7 +6,6 @@ import shlex import subprocess import sys -import time import xmltodict import boto3 import django @@ -1199,8 +1198,10 @@ def flush(request): # this is the place to do it. # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md tracer.flush() - telemetry.telemetry_writer.periodic(force_flush=True) - time.sleep(0.2) + # app_shutdown() sends a force flush with an app-closing event so the agent + # finalises the telemetry batch, then disables the writer (safe: /flush is + # called only once, at the end of the test suite). + telemetry.telemetry_writer.app_shutdown() return HttpResponse("OK") diff --git a/utils/build/docker/python/fastapi/main.py b/utils/build/docker/python/fastapi/main.py index c045bc7932b..f0bb97e9d43 100644 --- a/utils/build/docker/python/fastapi/main.py +++ b/utils/build/docker/python/fastapi/main.py @@ -6,7 +6,6 @@ import shlex import subprocess import sys -import time import typing import fastapi @@ -1410,6 +1409,8 @@ def flush(): # this is the place to do it. # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md tracer.flush() - telemetry.telemetry_writer.periodic(force_flush=True) - time.sleep(0.2) + # app_shutdown() sends a force flush with an app-closing event so the agent + # finalises the telemetry batch, then disables the writer (safe: /flush is + # called only once, at the end of the test suite). + telemetry.telemetry_writer.app_shutdown() return "OK" diff --git a/utils/build/docker/python/flask/app.py b/utils/build/docker/python/flask/app.py index 8d1552b2303..84393165ba3 100644 --- a/utils/build/docker/python/flask/app.py +++ b/utils/build/docker/python/flask/app.py @@ -19,7 +19,6 @@ import subprocess import sys import threading -import time import urllib.request import boto3 @@ -2244,6 +2243,8 @@ def flush(): # this is the place to do it. # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md tracer.flush() - telemetry.telemetry_writer.periodic(force_flush=True) - time.sleep(0.2) + # app_shutdown() sends a force flush with an app-closing event so the agent + # finalises the telemetry batch, then disables the writer (safe: /flush is + # called only once, at the end of the test suite). + telemetry.telemetry_writer.app_shutdown() return Response("OK") diff --git a/utils/build/docker/python/tornado/main.py b/utils/build/docker/python/tornado/main.py index 3efa5be515d..fec4c23d9b5 100644 --- a/utils/build/docker/python/tornado/main.py +++ b/utils/build/docker/python/tornado/main.py @@ -7,7 +7,6 @@ import sqlite3 import subprocess import sys -import time from http import HTTPStatus from typing import Any, ClassVar from urllib.parse import parse_qs @@ -875,8 +874,10 @@ def get(self) -> None: # this is the place to do it. # See https://github.com/DataDog/system-tests/blob/main/docs/edit/flushing.md tracer.flush() - telemetry.telemetry_writer.periodic(force_flush=True) - time.sleep(0.2) + # app_shutdown() sends a force flush with an app-closing event so the agent + # finalises the telemetry batch, then disables the writer (safe: /flush is + # called only once, at the end of the test suite). + telemetry.telemetry_writer.app_shutdown() self.write("OK")