diff --git a/README.md b/README.md index 011be80..f8f4fad 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,9 @@ some resource (such as a `dets` table) that doesn't handle casts. Args = term() Opt = {debug, Dbgs} | {min_batch_size | max_batch_size, non_neg_integer()} | - {reversed_batch, boolean()} + {reversed_batch, boolean()} | + {flush_mailbox_on_terminate, false | {true, SummaryCount}} + SummaryCount = non_neg_integer() Opts = [Opt] Opts = [term()] Result = {ok,Pid} | ignore | {error,Error} @@ -52,6 +54,16 @@ passed to `handle_batch/2` is in reversed order to the one the messages were received in. This avoids a `list:reverse/1` all before the batch handling and is somewhat more performant. +The `flush_mailbox_on_terminate` option controls whether pending mailbox messages +are drained when the server terminates. By default it is `false` (disabled). When +set to `{true, SummaryCount}`, up to `SummaryCount` of the pending messages are +captured along with the total mailbox count and written to the process dictionary +under the key `mailbox_summary` (as `#{total => Count, messages => [...]}`). The +mailbox is then fully drained. This happens *before* `Module:terminate/2` is called, +so the callback can inspect or forward the summary. Draining the mailbox prevents +`proc_lib` crash reports from pretty-printing potentially large messages (e.g. binary +payloads), which can cause unbounded memory growth. + #### cast(ServerRef, Request) -> ok diff --git a/src/gen_batch_server.erl b/src/gen_batch_server.erl index 36d627f..cfd2cf5 100644 --- a/src/gen_batch_server.erl +++ b/src/gen_batch_server.erl @@ -46,7 +46,8 @@ name :: atom(), module :: module(), hibernate_after = infinity :: non_neg_integer() | infinity, - reversed_batch = false :: boolean()}). + reversed_batch = false :: boolean(), + flush_mailbox_on_terminate = false :: false | {true, non_neg_integer()}}). -record(state, {batch = [] :: [op()], batch_count = 0 :: non_neg_integer(), @@ -146,6 +147,7 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) -> MinBatchSize = proplists:get_value(min_batch_size, GBOpts, ?MIN_MAX_BATCH_SIZE), ReverseBatch = proplists:get_value(reversed_batch, GBOpts, false), + FlushMailbox = proplists:get_value(flush_mailbox_on_terminate, GBOpts, false), Conf = #config{module = Mod, parent = Parent, name = Name, @@ -153,7 +155,8 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) -> min_batch_size = MinBatchSize, max_batch_size = MaxBatchSize, hibernate_after = HibernateAfter, - reversed_batch = ReverseBatch}, + reversed_batch = ReverseBatch, + flush_mailbox_on_terminate = FlushMailbox}, case init_it(Mod, Args) of {ok, {ok, Inner0}} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -288,8 +291,8 @@ loop_wait(#state{config = #config{hibernate_after = Hib}} = State00, Parent) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, State0#state.debug, State0); {'EXIT', Parent, Reason} -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), exit(Reason); _ -> enter_loop_batched(Msg, Parent, State0) @@ -343,8 +346,8 @@ loop_batched(#state{debug = Debug} = State0, Parent) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Debug, State0); {'EXIT', Parent, Reason} -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), exit(Reason); _ -> enter_loop_batched(Msg, Parent, State0) @@ -387,8 +390,8 @@ complete_batch(#state{batch = Batch0, throw:Result -> handle_batch_result(Result, State0, Debug0); Class:Reason:Stacktrace -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), erlang:raise(Class, Reason, safe_stacktrace(Stacktrace)) end. @@ -417,8 +420,8 @@ handle_batch_result({ok, Actions, Inner, {continue, Continue}}, State0, Debug0) needs_gc = ShouldGc, debug = Debug}); handle_batch_result({stop, Reason}, State0, _Debug0) -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), exit(Reason). handle_actions(Actions, Debug0) -> @@ -440,8 +443,8 @@ handle_continue(Continue, #state{config = #config{module = Mod}, throw:Result -> handle_continue_result(Result, State0); Class:Reason:Stacktrace -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), erlang:raise(Class, Reason, safe_stacktrace(Stacktrace)) end. @@ -450,8 +453,8 @@ handle_continue_result({ok, Inner}, State0) -> handle_continue_result({ok, Inner, {continue, NextContinue}}, State0) -> handle_continue(NextContinue, State0#state{state = Inner}); handle_continue_result({stop, Reason}, State0) -> + flush_mailbox(State0#state.config), terminate(Reason, State0), - flush_mailbox(), exit(Reason). handle_debug_in(#state{debug = Dbg0} = State, Msg) -> @@ -472,8 +475,8 @@ system_continue(Parent, Debug, State) -> -spec system_terminate(term(), pid(), list(), term()) -> no_return(). system_terminate(Reason, _Parent, _Debug, State) -> + flush_mailbox(State#state.config), terminate(Reason, State), - flush_mailbox(), exit(Reason). system_get_state(State) -> @@ -519,7 +522,8 @@ gen_start(undefined, Mod, Args, Opts0) -> {GBOpts, Opts} = lists:partition(fun ({Key, _}) -> Key == max_batch_size orelse Key == min_batch_size orelse - Key == reversed_batch; + Key == reversed_batch orelse + Key == flush_mailbox_on_terminate; (_) -> false end, Opts0), gen:start(?MODULE, link, Mod, {GBOpts, Args}, Opts); @@ -529,7 +533,8 @@ gen_start(Name, Mod, Args, Opts0) -> {GBOpts, Opts} = lists:partition(fun ({Key, _}) -> Key == max_batch_size orelse Key == min_batch_size orelse - Key == reversed_batch; + Key == reversed_batch orelse + Key == flush_mailbox_on_terminate; (_) -> false end, Opts0), gen:start(?MODULE, link, Name, Mod, {GBOpts, Args}, Opts). @@ -544,10 +549,34 @@ safe_stacktrace(Stacktrace) -> Frame end, Stacktrace). -%% Flush all messages from the mailbox. This prevents proc_lib crash reports -%% from pretty-printing potentially large messages (e.g. WAL write commands -%% containing binary payloads), which can cause unbounded memory growth. -flush_mailbox() -> - receive _ -> flush_mailbox() - after 0 -> ok +%% When disabled (default) do nothing. When enabled, collect a sample of up to +%% N messages and the total mailbox count, write them to the process dictionary +%% under the key 'mailbox_summary', then drain the remaining messages. +%% This prevents proc_lib crash reports from pretty-printing potentially large +%% messages (e.g. WAL write commands containing binary payloads), which can +%% cause unbounded memory growth. The summary is written before terminate/2 is +%% called so that the callback can inspect or forward it. +flush_mailbox(#config{flush_mailbox_on_terminate = false}) -> + ok; +flush_mailbox(#config{flush_mailbox_on_terminate = {true, N}}) -> + {Total, Sample} = drain_mailbox_sample(N, 0, []), + erlang:put(mailbox_summary, #{total => Total, messages => Sample}), + ok. + +drain_mailbox_sample(0, Count, Acc) -> + {Count + drain_mailbox_count(0), lists:reverse(Acc)}; +drain_mailbox_sample(N, Count, Acc) -> + receive + Msg -> + drain_mailbox_sample(N - 1, Count + 1, [Msg | Acc]) + after 0 -> + {Count, lists:reverse(Acc)} + end. + +drain_mailbox_count(Count) -> + receive + _ -> + drain_mailbox_count(Count + 1) + after 0 -> + Count end. diff --git a/test/gen_batch_server_SUITE.erl b/test/gen_batch_server_SUITE.erl index b516dd5..58f5946 100644 --- a/test/gen_batch_server_SUITE.erl +++ b/test/gen_batch_server_SUITE.erl @@ -47,7 +47,9 @@ all_tests() -> init_exception, handle_continue_stop, handle_continue_chained, - opts_not_at_front + opts_not_at_front, + flush_mailbox_on_terminate_disabled, + flush_mailbox_on_terminate_enabled ]. groups() -> @@ -608,6 +610,77 @@ collect_batch_sizes(Acc) -> lists:reverse(Acc) end. +flush_mailbox_on_terminate_disabled(Config) -> + %% Default behaviour: flush_mailbox_on_terminate is false, so no + %% mailbox_summary entry should appear in the process dictionary. + Mod = ?config(mod, Config), + Self = self(), + process_flag(trap_exit, true), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(_) -> {ok, #{}} end), + meck:expect(Mod, handle_batch, + fun([{cast, trigger_stop}], _State) -> + %% Inject 3 messages into our own mailbox so there is + %% something pending when we stop. + self() ! pending_msg_1, + self() ! pending_msg_2, + self() ! pending_msg_3, + {stop, because} + end), + meck:expect(Mod, terminate, + fun(because, _State) -> + %% flush disabled: key must be absent + Self ! {summary, erlang:get(mailbox_summary)}, + ok + end), + {ok, Pid} = gen_batch_server:start_link(Mod, []), + ok = gen_batch_server:cast(Pid, trigger_stop), + receive {'EXIT', Pid, because} -> ok after 2000 -> exit(timeout) end, + receive + {summary, Summary} -> + ?assertEqual(undefined, Summary) + after 2000 -> exit(summary_timeout) + end, + ?assert(meck:validate(Mod)), + ok. + +flush_mailbox_on_terminate_enabled(Config) -> + %% With {flush_mailbox_on_terminate, {true, 2}}, the summary must contain + %% the first 2 pending messages and the correct total count. + Mod = ?config(mod, Config), + Self = self(), + process_flag(trap_exit, true), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(_) -> {ok, #{}} end), + meck:expect(Mod, handle_batch, + fun([{cast, trigger_stop}], _State) -> + %% Inject 5 messages so total > SummaryCount (2). + self() ! pending_msg_1, + self() ! pending_msg_2, + self() ! pending_msg_3, + self() ! pending_msg_4, + self() ! pending_msg_5, + {stop, because} + end), + meck:expect(Mod, terminate, + fun(because, _State) -> + %% flush ran before us; summary must be present + Self ! {summary, erlang:get(mailbox_summary)}, + ok + end), + Opts = [{flush_mailbox_on_terminate, {true, 2}}], + {ok, Pid} = gen_batch_server:start_link(undefined, Mod, [], Opts), + ok = gen_batch_server:cast(Pid, trigger_stop), + receive {'EXIT', Pid, because} -> ok after 2000 -> exit(timeout) end, + receive + {summary, Summary} -> + ?assertMatch(#{total := 5, messages := [pending_msg_1, pending_msg_2]}, + Summary) + after 2000 -> exit(summary_timeout) + end, + ?assert(meck:validate(Mod)), + ok. + %% Utility wait_batch() -> wait_batch([]).