diff --git a/lib/broadway.ex b/lib/broadway.ex index 1dd951f..9f33d83 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -795,6 +795,57 @@ defmodule Broadway do } ``` + * `[:broadway, :handle_failed, :start]` - Dispatched before the `c:handle_failed/2` + callback is invoked + + * Measurement: `%{system_time: integer}` + + * Metadata: + + ``` + %{ + module: atom, + messages: [Broadway.Message.t], + context: term, + telemetry_span_context: reference + } + ``` + + * `[:broadway, :handle_failed, :stop]` - Dispatched after the `c:handle_failed/2` + callback has returned + + * Measurement: `%{duration: native_time}` + + * Metadata: + + ``` + %{ + module: atom, + messages: [Broadway.Message.t], + context: term, + telemetry_span_context: reference + } + ``` + + * `[:broadway, :handle_failed, :exception]` - Dispatched if the `c:handle_failed/2` + callback raises an exception + + * Measurement: `%{duration: native_time}` + + * Metadata: + + ``` + %{ + module: atom, + messages: [Broadway.Message.t], + context: term, + kind: :error | :exit | :throw, + reason: term, + stacktrace: list, + telemetry_span_context: reference + } + ``` + * `[:broadway, :batcher, :start]` - Dispatched by a Broadway batcher before handling events diff --git a/lib/broadway/acknowledger.ex b/lib/broadway/acknowledger.ex index 3a945da..cf69754 100644 --- a/lib/broadway/acknowledger.ex +++ b/lib/broadway/acknowledger.ex @@ -102,35 +102,50 @@ defmodule Broadway.Acknowledger do end defp handle_failed_messages(messages, module, context) do - module.handle_failed(messages, context) - catch - kind, reason -> - Logger.error(Exception.format(kind, reason, __STACKTRACE__), - crash_reason: crash_reason(kind, reason, __STACKTRACE__) + metadata = %{module: module, messages: messages, context: context} + + try do + :telemetry.span( + [:broadway, :handle_failed], + metadata, + fn -> + result = do_handle_failed_messages(messages, module, context) + {result, Map.put(metadata, :messages, result)} + end ) + catch + kind, reason -> + Logger.error(Exception.format(kind, reason, __STACKTRACE__), + crash_reason: crash_reason(kind, reason, __STACKTRACE__) + ) - messages - else - return_messages when is_list(return_messages) -> - size = length(messages) - return_size = length(return_messages) + messages + end + end - if return_size != size do - Logger.error( - "#{inspect(module)}.handle_failed/2 received #{size} messages and " <> - "returned only #{return_size}. All messages given to handle_failed/2 " <> - "must be returned" - ) - end + defp do_handle_failed_messages(messages, module, context) do + case module.handle_failed(messages, context) do + return_messages when is_list(return_messages) -> + size = length(messages) + return_size = length(return_messages) - return_messages + if return_size != size do + Logger.error( + "#{inspect(module)}.handle_failed/2 received #{size} messages and " <> + "returned only #{return_size}. All messages given to handle_failed/2 " <> + "must be returned" + ) + end - _other -> - Logger.error( - "#{inspect(module)}.handle_failed/2 didn't return a list of messages, " <> - "so ignoring its return value" - ) + return_messages - messages + _other -> + Logger.error( + "#{inspect(module)}.handle_failed/2 didn't return a list of messages, " <> + "so ignoring its return value" + ) + + messages + end end end diff --git a/test/broadway_test.exs b/test/broadway_test.exs index 36f8553..cd33488 100644 --- a/test/broadway_test.exs +++ b/test/broadway_test.exs @@ -1939,6 +1939,134 @@ defmodule BroadwayTest do end end + describe "handle_failed telemetry" do + test "emits [:broadway, :handle_failed] span events from a processor" do + broadway_name = new_unique_name() + test_pid = self() + + :telemetry.attach_many( + "#{broadway_name}-handle-failed", + [[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :stop]], + fn event, _measurements, metadata, _config -> + send(test_pid, {:telemetry_event, event, metadata}) + end, + %{} + ) + + {:ok, _broadway} = + Broadway.start_link(CustomHandlersWithHandleFailed, + name: broadway_name, + context: %{ + handle_message: fn message, _ -> Message.failed(message, :failed) end, + handle_failed: fn messages, _ -> + send(test_pid, :handle_failed_called) + messages + end + }, + producer: [module: {ManualProducer, []}], + processors: [default: []] + ) + + ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush) + + assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata} + assert start_metadata.module == CustomHandlersWithHandleFailed + assert [%Message{}] = start_metadata.messages + + assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata} + assert stop_metadata.module == CustomHandlersWithHandleFailed + assert [%Message{}] = stop_metadata.messages + + assert_receive :handle_failed_called + assert_receive {:ack, ^ref, _successful, _failed} + + :telemetry.detach("#{broadway_name}-handle-failed") + end + + test "emits [:broadway, :handle_failed] span events from a batch processor" do + broadway_name = new_unique_name() + test_pid = self() + + :telemetry.attach_many( + "#{broadway_name}-handle-failed", + [[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :stop]], + fn event, _measurements, metadata, _config -> + send(test_pid, {:telemetry_event, event, metadata}) + end, + %{} + ) + + {:ok, _broadway} = + Broadway.start_link(CustomHandlersWithHandleFailed, + name: broadway_name, + context: %{ + handle_message: fn message, _ -> message end, + handle_batch: fn _, messages, _, _ -> + Enum.map(messages, &Message.failed(&1, :failed)) + end, + handle_failed: fn messages, _ -> + send(test_pid, :handle_failed_called) + messages + end + }, + producer: [module: {ManualProducer, []}], + processors: [default: []], + batchers: [default: []] + ) + + ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush) + + assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata} + assert start_metadata.module == CustomHandlersWithHandleFailed + assert [%Message{}] = start_metadata.messages + + assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata} + assert stop_metadata.module == CustomHandlersWithHandleFailed + assert [%Message{}] = stop_metadata.messages + + assert_receive :handle_failed_called + assert_receive {:ack, ^ref, _successful, _failed} + + :telemetry.detach("#{broadway_name}-handle-failed") + end + + test "emits [:broadway, :handle_failed, :exception] when handle_failed/2 raises" do + broadway_name = new_unique_name() + test_pid = self() + + :telemetry.attach_many( + "#{broadway_name}-handle-failed", + [[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :exception]], + fn event, _measurements, metadata, _config -> + send(test_pid, {:telemetry_event, event, metadata}) + end, + %{} + ) + + {:ok, _broadway} = + Broadway.start_link(CustomHandlersWithHandleFailed, + name: broadway_name, + context: %{ + handle_message: fn message, _ -> Message.failed(message, :failed) end, + handle_failed: fn _messages, _ -> raise "oops" end + }, + producer: [module: {ManualProducer, []}], + processors: [default: []] + ) + + capture_log(fn -> + ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush) + assert_receive {:ack, ^ref, _successful, _failed} + end) + + assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], _} + assert_receive {:telemetry_event, [:broadway, :handle_failed, :exception], metadata} + assert %{kind: :error, reason: %RuntimeError{message: "oops"}} = metadata + + :telemetry.detach("#{broadway_name}-handle-failed") + end + end + describe "handle producer crash" do setup do test_pid = self()