Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ doc
logs
*.d
ebin/
bench/ebin/
_build
rebar.lock
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ some resource (such as a `dets` table) that doesn't handle casts.
Opt = {debug, Dbgs} |
{min_batch_size | max_batch_size, non_neg_integer()} |
{reversed_batch, boolean()} |
{flush_mailbox_on_terminate, false | {true, SummaryCount}}
{flush_mailbox_on_terminate, false | {true, SummaryCount}} |
{batch_size_growth, exponential | {aimd, Step}}
SummaryCount = non_neg_integer()
Step = pos_integer()
Opts = [Opt]
Opts = [term()]
Result = {ok,Pid} | ignore | {error,Error}
Expand All @@ -64,6 +66,14 @@ so the callback can inspect or forward the summary. Draining the mailbox prevent
`proc_lib` crash reports from pretty-printing potentially large messages (e.g. binary
payloads), which can cause unbounded memory growth.

The `batch_size_growth` option controls how the batch size grows when the server
is under load. The default, `exponential`, doubles the batch size each time a full
batch is completed (the original behaviour). Setting it to `{aimd, Step}` switches
to Additive Increase / Multiplicative Decrease: the batch size grows by `Step` on
each full batch and halves whenever the mailbox is found empty. AIMD produces a
smoother, more gradual ramp-up and is useful when large sudden jumps in batch size
are undesirable.


#### cast(ServerRef, Request) -> ok

Expand Down
126 changes: 126 additions & 0 deletions bench/benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/env escript
%% -*- erlang -*-
%%! -pa _build/default/lib/gen_batch_server/ebin -pa bench/ebin

%% Compares exponential (default) vs AIMD batch size growth strategies.
%%
%% The simulated workload models an fsync-backed log writer: each batch
%% incurs a fixed base cost (the fsync itself) plus a variable cost that
%% scales with the total bytes in the batch (the write). The worker
%% process uses off_heap message_queue_data so large binaries are not
%% copied into the process heap on receive.
%%
%% Prerequisites (from the project root):
%%
%% rebar3 compile
%% erlc -o bench/ebin bench/gbs_bench_worker.erl
%% escript bench/benchmark

-define(NUM_MESSAGES, 100_000).
-define(PAYLOAD_SIZE, 4096). %% 4 KB per message
-define(BASE_DELAY_US, 3000). %% 3 ms fixed fsync overhead per batch
-define(US_PER_MB, 3000). %% 3 ms per MB of batch data (~330 MB/s disk)
-define(MIN_BATCH, 32).
-define(MAX_BATCH, 8192).

main(_Args) ->
Payload = binary:copy(<<$x>>, ?PAYLOAD_SIZE),

Sep = string:copies("-", 68),
io:format("~n~s~n", [Sep]),
io:format("gen_batch_server benchmark~n"),
io:format(" messages : ~b~n", [?NUM_MESSAGES]),
io:format(" payload : ~b B~n", [?PAYLOAD_SIZE]),
io:format(" base delay : ~b us + ~b us/MB of batch data~n",
[?BASE_DELAY_US, ?US_PER_MB]),
io:format(" min_batch : ~b | max_batch : ~b~n",
[?MIN_BATCH, ?MAX_BATCH]),
io:format("~s~n~n", [Sep]),

run_scenario("exponential (default)", exponential, Payload),
run_scenario("AIMD step=32", {aimd, 32}, Payload),
run_scenario("AIMD step=64", {aimd, 64}, Payload),
run_scenario("AIMD step=128", {aimd, 128}, Payload),
io:format("~s~n", [Sep]).

%% -------------------------------------------------------------------------

run_scenario(Label, Growth, Payload) ->
Self = self(),
Opts = [{min_batch_size, ?MIN_BATCH},
{max_batch_size, ?MAX_BATCH},
{batch_size_growth, Growth}],
{ok, Pid} = gen_batch_server:start_link(
undefined, gbs_bench_worker,
#{base_delay_us => ?BASE_DELAY_US,
us_per_mb => ?US_PER_MB,
payload_size => ?PAYLOAD_SIZE,
collector => Self},
Opts),

T0 = erlang:monotonic_time(microsecond),

[gen_batch_server:cast(Pid, Payload) || _ <- lists:seq(1, ?NUM_MESSAGES)],

%% Synchronous barrier: returns only after every preceding cast has
%% been handled. The reply carries final GC stats from the worker.
{ok, GcStats} = gen_batch_server:call(Pid, sync, 300_000),

T1 = erlang:monotonic_time(microsecond),

gen_batch_server:stop(Pid),

{Sizes, HeapPerBatch} = drain_batch_msgs([], []),
ElapsedMs = (T1 - T0) / 1000,

print_results(Label, ElapsedMs, Sizes, GcStats, HeapPerBatch),

timer:sleep(500).

drain_batch_msgs(SizeAcc, HeapAcc) ->
receive
{batch, N, HeapWords} ->
drain_batch_msgs([N | SizeAcc], [HeapWords | HeapAcc])
after 0 ->
{lists:reverse(SizeAcc), lists:reverse(HeapAcc)}
end.

print_results(Label, ElapsedMs, Sizes,
#{minor_gcs := MinorGcs, major_gcs := MajorGcs,
max_heap := MaxHeap},
HeapPerBatch) ->
NumBatches = length(Sizes),
TotalMsgs = lists:sum(Sizes),
Sorted = lists:sort(Sizes),
BatchMin = hd(Sorted),
BatchMax = lists:last(Sorted),
BatchMean = TotalMsgs / max(1, NumBatches),
P50 = percentile(Sorted, 0.50),
P95 = percentile(Sorted, 0.95),
Throughput = round(?NUM_MESSAGES / (ElapsedMs / 1000)),

HeapSorted = lists:sort(HeapPerBatch),
HeapMax = lists:last(HeapSorted),
HeapMean = lists:sum(HeapPerBatch) / max(1, length(HeapPerBatch)),
HeapP50 = percentile(HeapSorted, 0.50),

WordSize = erlang:system_info(wordsize),

io:format(" ~s~n", [Label]),
io:format(" elapsed : ~.1f ms~n", [ElapsedMs]),
io:format(" throughput : ~b msg/s~n", [Throughput]),
io:format(" batches : ~b (total msgs accounted: ~b)~n",
[NumBatches, TotalMsgs]),
io:format(" batch size : min=~b mean=~.1f p50=~b p95=~b max=~b~n",
[BatchMin, BatchMean, P50, P95, BatchMax]),
io:format(" GC : minor=~b major=~b~n",
[MinorGcs, MajorGcs]),
io:format(" heap peak : ~b words (~.1f KB)~n",
[MaxHeap, MaxHeap * WordSize / 1024]),
io:format(" heap/batch : mean=~b p50=~b max=~b words~n~n",
[round(HeapMean), HeapP50, HeapMax]).

percentile(Sorted, P) ->
N = length(Sorted),
Idx = max(1, round(N * P)),
lists:nth(Idx, Sorted).
41 changes: 41 additions & 0 deletions bench/gbs_bench_worker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-module(gbs_bench_worker).
-behaviour(gen_batch_server).
-export([init/1, handle_batch/2]).

init(Cfg) ->
process_flag(message_queue_data, off_heap),
{ok, Cfg}.

handle_batch(Batch, #{base_delay_us := Base,
us_per_mb := UsMb,
payload_size := PSize,
collector := C} = St) ->
{heap_size, HeapBefore} = erlang:process_info(self(), heap_size),
BatchBytes = length(Batch) * PSize,
DelayUs = Base + round(BatchBytes * UsMb / (1024 * 1024)),
busy_wait_us(DelayUs),
{heap_size, HeapAfter} = erlang:process_info(self(), heap_size),
HeapWords = max(HeapBefore, HeapAfter),
{garbage_collection_info, GcProps} =
erlang:process_info(self(), garbage_collection_info),
MinorGcs = proplists:get_value(minor_gcs, GcProps, 0),
MajorGcs = proplists:get_value(major_gcs, GcProps, 0),
GcStats = #{minor_gcs => MinorGcs,
major_gcs => MajorGcs,
max_heap => HeapWords},
C ! {batch, length(Batch), HeapWords},
Actions = [{reply, From, {ok, GcStats}} || {call, From, _} <- Batch],
{ok, Actions, St}.

%% Microsecond-precision busy-wait. timer:sleep/1 only accepts integer
%% milliseconds, which truncates sub-ms variable costs to zero for small
%% batches.
busy_wait_us(Us) ->
Target = erlang:monotonic_time(microsecond) + Us,
busy_wait_until(Target).

busy_wait_until(Target) ->
case erlang:monotonic_time(microsecond) >= Target of
true -> ok;
false -> busy_wait_until(Target)
end.
28 changes: 19 additions & 9 deletions src/gen_batch_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
module :: module(),
hibernate_after = infinity :: non_neg_integer() | infinity,
reversed_batch = false :: boolean(),
flush_mailbox_on_terminate = false :: false | {true, non_neg_integer()}}).
flush_mailbox_on_terminate = false :: false | {true, non_neg_integer()},
batch_size_growth = exponential :: exponential | {aimd, pos_integer()}}).

-record(state, {batch = [] :: [op()],
batch_count = 0 :: non_neg_integer(),
Expand All @@ -56,8 +57,6 @@
needs_gc = false :: boolean(),
debug :: list()}).

% -type state() :: #state{}.

-export_type([from/0, op/0,
action/0, server_ref/0]).

Expand Down Expand Up @@ -148,6 +147,8 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) ->
?MIN_MAX_BATCH_SIZE),
ReverseBatch = proplists:get_value(reversed_batch, GBOpts, false),
FlushMailbox = proplists:get_value(flush_mailbox_on_terminate, GBOpts, false),
BatchSizeGrowth = proplists:get_value(batch_size_growth, GBOpts,
exponential),
Conf = #config{module = Mod,
parent = Parent,
name = Name,
Expand All @@ -156,7 +157,8 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) ->
max_batch_size = MaxBatchSize,
hibernate_after = HibernateAfter,
reversed_batch = ReverseBatch,
flush_mailbox_on_terminate = FlushMailbox},
flush_mailbox_on_terminate = FlushMailbox,
batch_size_growth = BatchSizeGrowth},
case init_it(Mod, Args) of
{ok, {ok, Inner0}} ->
proc_lib:init_ack(Starter, {ok, self()}),
Expand Down Expand Up @@ -329,13 +331,19 @@ enter_loop_batched(Msg, Parent, State0) ->
loop_batched(append_msg(Msg, State), Parent).

loop_batched(#state{config = #config{batch_size = BatchSize,
max_batch_size = Max} = Config,
max_batch_size = Max,
batch_size_growth = Growth} = Config,
batch_count = BatchCount} = State0,
Parent) when BatchCount >= BatchSize ->
% complete batch after seeing batch_size writes
State = complete_batch(State0),
% grow max batch size
NewBatchSize = min(Max, BatchSize * 2),
% grow batch size according to the configured strategy
NewBatchSize = case Growth of
exponential ->
min(Max, BatchSize * 2);
{aimd, Step} ->
min(Max, BatchSize + Step)
end,
loop_wait(State#state{config = Config#config{batch_size = NewBatchSize}},
Parent);
loop_batched(#state{debug = Debug} = State0, Parent) ->
Expand Down Expand Up @@ -523,7 +531,8 @@ gen_start(undefined, Mod, Args, Opts0) ->
Key == max_batch_size orelse
Key == min_batch_size orelse
Key == reversed_batch orelse
Key == flush_mailbox_on_terminate;
Key == flush_mailbox_on_terminate orelse
Key == batch_size_growth;
(_) -> false
end, Opts0),
gen:start(?MODULE, link, Mod, {GBOpts, Args}, Opts);
Expand All @@ -534,7 +543,8 @@ gen_start(Name, Mod, Args, Opts0) ->
Key == max_batch_size orelse
Key == min_batch_size orelse
Key == reversed_batch orelse
Key == flush_mailbox_on_terminate;
Key == flush_mailbox_on_terminate orelse
Key == batch_size_growth;
(_) -> false
end, Opts0),
gen:start(?MODULE, link, Name, Mod, {GBOpts, Args}, Opts).
Expand Down
31 changes: 30 additions & 1 deletion test/gen_batch_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ all_tests() ->
handle_continue_chained,
opts_not_at_front,
flush_mailbox_on_terminate_disabled,
flush_mailbox_on_terminate_enabled
flush_mailbox_on_terminate_enabled,
aimd_batch_size_growth
].

groups() ->
Expand Down Expand Up @@ -603,6 +604,34 @@ opts_not_at_front(Config) ->
?assert(meck:validate(Mod)),
ok.

aimd_batch_size_growth(Config) ->
Mod = ?config(mod, Config),
Self = self(),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun(_) -> {ok, #{}} end),
meck:expect(Mod, handle_batch,
fun(Ops, State) ->
Self ! {batch_size, length(Ops)},
{ok, State}
end),
%% With AIMD(step=32), growth from min=32 to max=256 takes 7 full batches;
%% exponential doubling covers the same range in only 3. Verify additive
%% growth is used by counting distinct sub-max batch sizes observed.
Opts = [{min_batch_size, 32},
{max_batch_size, 256},
{batch_size_growth, {aimd, 32}}],
{ok, Pid} = gen_batch_server:start_link(undefined, Mod, [], Opts),
[gen_batch_server:cast(Pid, I) || I <- lists:seq(1, 2000)],
timer:sleep(200),
Sizes = collect_batch_sizes([]),
?assert(lists:all(fun(S) -> S =< 256 end, Sizes)),
%% AIMD yields 7 distinct sub-max steps (32,64,96,128,160,192,224);
%% exponential yields only 3 (32,64,128). Require at least 5.
SubMaxSizes = lists:usort([S || S <- Sizes, S < 256]),
?assert(length(SubMaxSizes) >= 5),
?assert(meck:validate(Mod)),
ok.

collect_batch_sizes(Acc) ->
receive
{batch_size, N} -> collect_batch_sizes([N | Acc])
Expand Down
Loading