From cd24b3ba1bd268fe29849da2f8815bf42cd83030 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 31 Mar 2026 12:42:57 +0100 Subject: [PATCH] Add configurable mailbox flushing on terminate Introduce the flush_mailbox_on_terminate option (type: false | {true, N}, default: false). When enabled, up to N pending mailbox messages and the total mailbox count are written to the process dictionary under the key mailbox_summary before the process exits. The remaining messages are then drained to prevent proc_lib crash reports from pretty-printing potentially large payloads. The flush now happens before Module:terminate/2 is called at all exit points, so the terminate callback can inspect or forward the summary. Add two CT test cases covering the disabled (default) and enabled paths, and document the new option in the README. --- README.md | 14 +++++- src/gen_batch_server.erl | 63 +++++++++++++++++++-------- test/gen_batch_server_SUITE.erl | 75 ++++++++++++++++++++++++++++++++- 3 files changed, 133 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 011be80..f8f4fad 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,9 @@ some resource (such as a `dets` table) that doesn't handle casts. Args = term() Opt = {debug, Dbgs} | {min_batch_size | max_batch_size, non_neg_integer()} | - {reversed_batch, boolean()} + {reversed_batch, boolean()} | + {flush_mailbox_on_terminate, false | {true, SummaryCount}} + SummaryCount = non_neg_integer() Opts = [Opt] Opts = [term()] Result = {ok,Pid} | ignore | {error,Error} @@ -52,6 +54,16 @@ passed to `handle_batch/2` is in reversed order to the one the messages were received in. This avoids a `list:reverse/1` all before the batch handling and is somewhat more performant. +The `flush_mailbox_on_terminate` option controls whether pending mailbox messages +are drained when the server terminates. By default it is `false` (disabled). When +set to `{true, SummaryCount}`, up to `SummaryCount` of the pending messages are +captured along with the total mailbox count and written to the process dictionary +under the key `mailbox_summary` (as `#{total => Count, messages => [...]}`). The +mailbox is then fully drained. This happens *before* `Module:terminate/2` is called, +so the callback can inspect or forward the summary. Draining the mailbox prevents +`proc_lib` crash reports from pretty-printing potentially large messages (e.g. binary +payloads), which can cause unbounded memory growth. + #### cast(ServerRef, Request) -> ok diff --git a/src/gen_batch_server.erl b/src/gen_batch_server.erl index 36d627f..cfd2cf5 100644 --- a/src/gen_batch_server.erl +++ b/src/gen_batch_server.erl @@ -46,7 +46,8 @@ name :: atom(), module :: module(), hibernate_after = infinity :: non_neg_integer() | infinity, - reversed_batch = false :: boolean()}). + reversed_batch = false :: boolean(), + flush_mailbox_on_terminate = false :: false | {true, non_neg_integer()}}). -record(state, {batch = [] :: [op()], batch_count = 0 :: non_neg_integer(), @@ -146,6 +147,7 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) -> MinBatchSize = proplists:get_value(min_batch_size, GBOpts, ?MIN_MAX_BATCH_SIZE), ReverseBatch = proplists:get_value(reversed_batch, GBOpts, false), + FlushMailbox = proplists:get_value(flush_mailbox_on_terminate, GBOpts, false), Conf = #config{module = Mod, parent = Parent, name = Name, @@ -153,7 +155,8 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) -> min_batch_size = MinBatchSize, max_batch_size = MaxBatchSize, hibernate_after = HibernateAfter, - reversed_batch = ReverseBatch}, + reversed_batch = ReverseBatch, + flush_mailbox_on_terminate = FlushMailbox}, case init_it(Mod, Args) of {ok, {ok, Inner0}} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -288,8 +291,8 @@ loop_wait(#state{config = #config{hibernate_after = Hib}} = State00, Parent) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, State0#state.debug, State0); {'EXIT', Parent, Reason} -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), exit(Reason); _ -> enter_loop_batched(Msg, Parent, State0) @@ -343,8 +346,8 @@ loop_batched(#state{debug = Debug} = State0, Parent) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Debug, State0); {'EXIT', Parent, Reason} -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), exit(Reason); _ -> enter_loop_batched(Msg, Parent, State0) @@ -387,8 +390,8 @@ complete_batch(#state{batch = Batch0, throw:Result -> handle_batch_result(Result, State0, Debug0); Class:Reason:Stacktrace -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), erlang:raise(Class, Reason, safe_stacktrace(Stacktrace)) end. @@ -417,8 +420,8 @@ handle_batch_result({ok, Actions, Inner, {continue, Continue}}, State0, Debug0) needs_gc = ShouldGc, debug = Debug}); handle_batch_result({stop, Reason}, State0, _Debug0) -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), exit(Reason). handle_actions(Actions, Debug0) -> @@ -440,8 +443,8 @@ handle_continue(Continue, #state{config = #config{module = Mod}, throw:Result -> handle_continue_result(Result, State0); Class:Reason:Stacktrace -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), erlang:raise(Class, Reason, safe_stacktrace(Stacktrace)) end. @@ -450,8 +453,8 @@ handle_continue_result({ok, Inner}, State0) -> handle_continue_result({ok, Inner, {continue, NextContinue}}, State0) -> handle_continue(NextContinue, State0#state{state = Inner}); handle_continue_result({stop, Reason}, State0) -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), exit(Reason). handle_debug_in(#state{debug = Dbg0} = State, Msg) -> @@ -472,8 +475,8 @@ system_continue(Parent, Debug, State) -> -spec system_terminate(term(), pid(), list(), term()) -> no_return(). system_terminate(Reason, _Parent, _Debug, State) -> + flush_mailbox(State#state.config), terminate(Reason, State), - flush_mailbox(), exit(Reason). system_get_state(State) -> @@ -519,7 +522,8 @@ gen_start(undefined, Mod, Args, Opts0) -> {GBOpts, Opts} = lists:partition(fun ({Key, _}) -> Key == max_batch_size orelse Key == min_batch_size orelse - Key == reversed_batch; + Key == reversed_batch orelse + Key == flush_mailbox_on_terminate; (_) -> false end, Opts0), gen:start(?MODULE, link, Mod, {GBOpts, Args}, Opts); @@ -529,7 +533,8 @@ gen_start(Name, Mod, Args, Opts0) -> {GBOpts, Opts} = lists:partition(fun ({Key, _}) -> Key == max_batch_size orelse Key == min_batch_size orelse - Key == reversed_batch; + Key == reversed_batch orelse + Key == flush_mailbox_on_terminate; (_) -> false end, Opts0), gen:start(?MODULE, link, Name, Mod, {GBOpts, Args}, Opts). @@ -544,10 +549,34 @@ safe_stacktrace(Stacktrace) -> Frame end, Stacktrace). -%% Flush all messages from the mailbox. This prevents proc_lib crash reports -%% from pretty-printing potentially large messages (e.g. WAL write commands -%% containing binary payloads), which can cause unbounded memory growth. -flush_mailbox() -> - receive _ -> flush_mailbox() - after 0 -> ok +%% When disabled (default) do nothing. When enabled, collect a sample of up to +%% N messages and the total mailbox count, write them to the process dictionary +%% under the key 'mailbox_summary', then drain the remaining messages. +%% This prevents proc_lib crash reports from pretty-printing potentially large +%% messages (e.g. WAL write commands containing binary payloads), which can +%% cause unbounded memory growth. The summary is written before terminate/2 is +%% called so that the callback can inspect or forward it. +flush_mailbox(#config{flush_mailbox_on_terminate = false}) -> + ok; +flush_mailbox(#config{flush_mailbox_on_terminate = {true, N}}) -> + {Total, Sample} = drain_mailbox_sample(N, 0, []), + erlang:put(mailbox_summary, #{total => Total, messages => Sample}), + ok. + +drain_mailbox_sample(0, Count, Acc) -> + {Count + drain_mailbox_count(0), lists:reverse(Acc)}; +drain_mailbox_sample(N, Count, Acc) -> + receive + Msg -> + drain_mailbox_sample(N - 1, Count + 1, [Msg | Acc]) + after 0 -> + {Count, lists:reverse(Acc)} + end. + +drain_mailbox_count(Count) -> + receive + _ -> + drain_mailbox_count(Count + 1) + after 0 -> + Count end. diff --git a/test/gen_batch_server_SUITE.erl b/test/gen_batch_server_SUITE.erl index b516dd5..58f5946 100644 --- a/test/gen_batch_server_SUITE.erl +++ b/test/gen_batch_server_SUITE.erl @@ -47,7 +47,9 @@ all_tests() -> init_exception, handle_continue_stop, handle_continue_chained, - opts_not_at_front + opts_not_at_front, + flush_mailbox_on_terminate_disabled, + flush_mailbox_on_terminate_enabled ]. groups() -> @@ -608,6 +610,77 @@ collect_batch_sizes(Acc) -> lists:reverse(Acc) end. +flush_mailbox_on_terminate_disabled(Config) -> + %% Default behaviour: flush_mailbox_on_terminate is false, so no + %% mailbox_summary entry should appear in the process dictionary. + Mod = ?config(mod, Config), + Self = self(), + process_flag(trap_exit, true), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(_) -> {ok, #{}} end), + meck:expect(Mod, handle_batch, + fun([{cast, trigger_stop}], _State) -> + %% Inject 3 messages into our own mailbox so there is + %% something pending when we stop. + self() ! pending_msg_1, + self() ! pending_msg_2, + self() ! pending_msg_3, + {stop, because} + end), + meck:expect(Mod, terminate, + fun(because, _State) -> + %% flush disabled: key must be absent + Self ! {summary, erlang:get(mailbox_summary)}, + ok + end), + {ok, Pid} = gen_batch_server:start_link(Mod, []), + ok = gen_batch_server:cast(Pid, trigger_stop), + receive {'EXIT', Pid, because} -> ok after 2000 -> exit(timeout) end, + receive + {summary, Summary} -> + ?assertEqual(undefined, Summary) + after 2000 -> exit(summary_timeout) + end, + ?assert(meck:validate(Mod)), + ok. + +flush_mailbox_on_terminate_enabled(Config) -> + %% With {flush_mailbox_on_terminate, {true, 2}}, the summary must contain + %% the first 2 pending messages and the correct total count. + Mod = ?config(mod, Config), + Self = self(), + process_flag(trap_exit, true), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(_) -> {ok, #{}} end), + meck:expect(Mod, handle_batch, + fun([{cast, trigger_stop}], _State) -> + %% Inject 5 messages so total > SummaryCount (2). + self() ! pending_msg_1, + self() ! pending_msg_2, + self() ! pending_msg_3, + self() ! pending_msg_4, + self() ! pending_msg_5, + {stop, because} + end), + meck:expect(Mod, terminate, + fun(because, _State) -> + %% flush ran before us; summary must be present + Self ! {summary, erlang:get(mailbox_summary)}, + ok + end), + Opts = [{flush_mailbox_on_terminate, {true, 2}}], + {ok, Pid} = gen_batch_server:start_link(undefined, Mod, [], Opts), + ok = gen_batch_server:cast(Pid, trigger_stop), + receive {'EXIT', Pid, because} -> ok after 2000 -> exit(timeout) end, + receive + {summary, Summary} -> + ?assertMatch(#{total := 5, messages := [pending_msg_1, pending_msg_2]}, + Summary) + after 2000 -> exit(summary_timeout) + end, + ?assert(meck:validate(Mod)), + ok. + %% Utility wait_batch() -> wait_batch([]).