Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ some resource (such as a `dets` table) that doesn't handle casts.
Args = term()
Opt = {debug, Dbgs} |
{min_batch_size | max_batch_size, non_neg_integer()} |
{reversed_batch, boolean()}
{reversed_batch, boolean()} |
{flush_mailbox_on_terminate, false | {true, SummaryCount}}
SummaryCount = non_neg_integer()
Opts = [Opt]
Opts = [term()]
Result = {ok,Pid} | ignore | {error,Error}
Expand All @@ -52,6 +54,16 @@ passed to `handle_batch/2` is in reversed order to the one the messages were
received in. This avoids a `list:reverse/1` all before the batch handling and is
somewhat more performant.

The `flush_mailbox_on_terminate` option controls whether pending mailbox messages
are drained when the server terminates. By default it is `false` (disabled). When
set to `{true, SummaryCount}`, up to `SummaryCount` of the pending messages are
captured along with the total mailbox count and written to the process dictionary
under the key `mailbox_summary` (as `#{total => Count, messages => [...]}`). The
mailbox is then fully drained. This happens *before* `Module:terminate/2` is called,
so the callback can inspect or forward the summary. Draining the mailbox prevents
`proc_lib` crash reports from pretty-printing potentially large messages (e.g. binary
payloads), which can cause unbounded memory growth.


#### cast(ServerRef, Request) -> ok

Expand Down
63 changes: 46 additions & 17 deletions src/gen_batch_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
name :: atom(),
module :: module(),
hibernate_after = infinity :: non_neg_integer() | infinity,
reversed_batch = false :: boolean()}).
reversed_batch = false :: boolean(),
flush_mailbox_on_terminate = false :: false | {true, non_neg_integer()}}).

-record(state, {batch = [] :: [op()],
batch_count = 0 :: non_neg_integer(),
Expand Down Expand Up @@ -146,14 +147,16 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) ->
MinBatchSize = proplists:get_value(min_batch_size, GBOpts,
?MIN_MAX_BATCH_SIZE),
ReverseBatch = proplists:get_value(reversed_batch, GBOpts, false),
FlushMailbox = proplists:get_value(flush_mailbox_on_terminate, GBOpts, false),
Conf = #config{module = Mod,
parent = Parent,
name = Name,
batch_size = MinBatchSize,
min_batch_size = MinBatchSize,
max_batch_size = MaxBatchSize,
hibernate_after = HibernateAfter,
reversed_batch = ReverseBatch},
reversed_batch = ReverseBatch,
flush_mailbox_on_terminate = FlushMailbox},
case init_it(Mod, Args) of
{ok, {ok, Inner0}} ->
proc_lib:init_ack(Starter, {ok, self()}),
Expand Down Expand Up @@ -288,8 +291,8 @@ loop_wait(#state{config = #config{hibernate_after = Hib}} = State00, Parent) ->
sys:handle_system_msg(Request, From, Parent,
?MODULE, State0#state.debug, State0);
{'EXIT', Parent, Reason} ->
flush_mailbox(State0#state.config),
terminate(Reason, State0),
flush_mailbox(),
exit(Reason);
_ ->
enter_loop_batched(Msg, Parent, State0)
Expand Down Expand Up @@ -343,8 +346,8 @@ loop_batched(#state{debug = Debug} = State0, Parent) ->
sys:handle_system_msg(Request, From, Parent,
?MODULE, Debug, State0);
{'EXIT', Parent, Reason} ->
flush_mailbox(State0#state.config),
terminate(Reason, State0),
flush_mailbox(),
exit(Reason);
_ ->
enter_loop_batched(Msg, Parent, State0)
Expand Down Expand Up @@ -387,8 +390,8 @@ complete_batch(#state{batch = Batch0,
throw:Result ->
handle_batch_result(Result, State0, Debug0);
Class:Reason:Stacktrace ->
flush_mailbox(State0#state.config),
terminate(Reason, State0),
flush_mailbox(),
erlang:raise(Class, Reason, safe_stacktrace(Stacktrace))
end.

Expand Down Expand Up @@ -417,8 +420,8 @@ handle_batch_result({ok, Actions, Inner, {continue, Continue}}, State0, Debug0)
needs_gc = ShouldGc,
debug = Debug});
handle_batch_result({stop, Reason}, State0, _Debug0) ->
flush_mailbox(State0#state.config),
terminate(Reason, State0),
flush_mailbox(),
exit(Reason).

handle_actions(Actions, Debug0) ->
Expand All @@ -440,8 +443,8 @@ handle_continue(Continue, #state{config = #config{module = Mod},
throw:Result ->
handle_continue_result(Result, State0);
Class:Reason:Stacktrace ->
flush_mailbox(State0#state.config),
terminate(Reason, State0),
flush_mailbox(),
erlang:raise(Class, Reason, safe_stacktrace(Stacktrace))
end.

Expand All @@ -450,8 +453,8 @@ handle_continue_result({ok, Inner}, State0) ->
handle_continue_result({ok, Inner, {continue, NextContinue}}, State0) ->
handle_continue(NextContinue, State0#state{state = Inner});
handle_continue_result({stop, Reason}, State0) ->
flush_mailbox(State0#state.config),
terminate(Reason, State0),
flush_mailbox(),
exit(Reason).

handle_debug_in(#state{debug = Dbg0} = State, Msg) ->
Expand All @@ -472,8 +475,8 @@ system_continue(Parent, Debug, State) ->

-spec system_terminate(term(), pid(), list(), term()) -> no_return().
system_terminate(Reason, _Parent, _Debug, State) ->
flush_mailbox(State#state.config),
terminate(Reason, State),
flush_mailbox(),
exit(Reason).

system_get_state(State) ->
Expand Down Expand Up @@ -519,7 +522,8 @@ gen_start(undefined, Mod, Args, Opts0) ->
{GBOpts, Opts} = lists:partition(fun ({Key, _}) ->
Key == max_batch_size orelse
Key == min_batch_size orelse
Key == reversed_batch;
Key == reversed_batch orelse
Key == flush_mailbox_on_terminate;
(_) -> false
end, Opts0),
gen:start(?MODULE, link, Mod, {GBOpts, Args}, Opts);
Expand All @@ -529,7 +533,8 @@ gen_start(Name, Mod, Args, Opts0) ->
{GBOpts, Opts} = lists:partition(fun ({Key, _}) ->
Key == max_batch_size orelse
Key == min_batch_size orelse
Key == reversed_batch;
Key == reversed_batch orelse
Key == flush_mailbox_on_terminate;
(_) -> false
end, Opts0),
gen:start(?MODULE, link, Name, Mod, {GBOpts, Args}, Opts).
Expand All @@ -544,10 +549,34 @@ safe_stacktrace(Stacktrace) ->
Frame
end, Stacktrace).

%% Flush all messages from the mailbox. This prevents proc_lib crash reports
%% from pretty-printing potentially large messages (e.g. WAL write commands
%% containing binary payloads), which can cause unbounded memory growth.
flush_mailbox() ->
receive _ -> flush_mailbox()
after 0 -> ok
%% When disabled (default) do nothing. When enabled, collect a sample of up to
%% N messages and the total mailbox count, write them to the process dictionary
%% under the key 'mailbox_summary', then drain the remaining messages.
%% This prevents proc_lib crash reports from pretty-printing potentially large
%% messages (e.g. WAL write commands containing binary payloads), which can
%% cause unbounded memory growth. The summary is written before terminate/2 is
%% called so that the callback can inspect or forward it.
flush_mailbox(#config{flush_mailbox_on_terminate = false}) ->
ok;
flush_mailbox(#config{flush_mailbox_on_terminate = {true, N}}) ->
{Total, Sample} = drain_mailbox_sample(N, 0, []),
erlang:put(mailbox_summary, #{total => Total, messages => Sample}),
ok.

drain_mailbox_sample(0, Count, Acc) ->
{Count + drain_mailbox_count(0), lists:reverse(Acc)};
drain_mailbox_sample(N, Count, Acc) ->
receive
Msg ->
drain_mailbox_sample(N - 1, Count + 1, [Msg | Acc])
after 0 ->
{Count, lists:reverse(Acc)}
end.

drain_mailbox_count(Count) ->
receive
_ ->
drain_mailbox_count(Count + 1)
after 0 ->
Count
end.
75 changes: 74 additions & 1 deletion test/gen_batch_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ all_tests() ->
init_exception,
handle_continue_stop,
handle_continue_chained,
opts_not_at_front
opts_not_at_front,
flush_mailbox_on_terminate_disabled,
flush_mailbox_on_terminate_enabled
].

groups() ->
Expand Down Expand Up @@ -608,6 +610,77 @@ collect_batch_sizes(Acc) ->
lists:reverse(Acc)
end.

flush_mailbox_on_terminate_disabled(Config) ->
%% Default behaviour: flush_mailbox_on_terminate is false, so no
%% mailbox_summary entry should appear in the process dictionary.
Mod = ?config(mod, Config),
Self = self(),
process_flag(trap_exit, true),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun(_) -> {ok, #{}} end),
meck:expect(Mod, handle_batch,
fun([{cast, trigger_stop}], _State) ->
%% Inject 3 messages into our own mailbox so there is
%% something pending when we stop.
self() ! pending_msg_1,
self() ! pending_msg_2,
self() ! pending_msg_3,
{stop, because}
end),
meck:expect(Mod, terminate,
fun(because, _State) ->
%% flush disabled: key must be absent
Self ! {summary, erlang:get(mailbox_summary)},
ok
end),
{ok, Pid} = gen_batch_server:start_link(Mod, []),
ok = gen_batch_server:cast(Pid, trigger_stop),
receive {'EXIT', Pid, because} -> ok after 2000 -> exit(timeout) end,
receive
{summary, Summary} ->
?assertEqual(undefined, Summary)
after 2000 -> exit(summary_timeout)
end,
?assert(meck:validate(Mod)),
ok.

flush_mailbox_on_terminate_enabled(Config) ->
%% With {flush_mailbox_on_terminate, {true, 2}}, the summary must contain
%% the first 2 pending messages and the correct total count.
Mod = ?config(mod, Config),
Self = self(),
process_flag(trap_exit, true),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun(_) -> {ok, #{}} end),
meck:expect(Mod, handle_batch,
fun([{cast, trigger_stop}], _State) ->
%% Inject 5 messages so total > SummaryCount (2).
self() ! pending_msg_1,
self() ! pending_msg_2,
self() ! pending_msg_3,
self() ! pending_msg_4,
self() ! pending_msg_5,
{stop, because}
end),
meck:expect(Mod, terminate,
fun(because, _State) ->
%% flush ran before us; summary must be present
Self ! {summary, erlang:get(mailbox_summary)},
ok
end),
Opts = [{flush_mailbox_on_terminate, {true, 2}}],
{ok, Pid} = gen_batch_server:start_link(undefined, Mod, [], Opts),
ok = gen_batch_server:cast(Pid, trigger_stop),
receive {'EXIT', Pid, because} -> ok after 2000 -> exit(timeout) end,
receive
{summary, Summary} ->
?assertMatch(#{total := 5, messages := [pending_msg_1, pending_msg_2]},
Summary)
after 2000 -> exit(summary_timeout)
end,
?assert(meck:validate(Mod)),
ok.

%% Utility
wait_batch() ->
wait_batch([]).
Expand Down
Loading