From 99188080772a5327cad417ded6b993723a7cf219 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 31 Mar 2026 14:03:54 +0100 Subject: [PATCH 1/2] Add optional AIMD batch size growth strategy Introduce a `batch_size_growth` start option (default: `exponential`, which preserves the existing doubling behaviour) that accepts `{aimd, Step}` to switch to Additive Increase / Multiplicative Decrease growth: the batch size grows by `Step` each time a full batch is completed, and halves whenever the mailbox is found empty. --- README.md | 12 +++++++++++- src/gen_batch_server.erl | 28 +++++++++++++++++++--------- test/gen_batch_server_SUITE.erl | 31 ++++++++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index f8f4fad..37d539c 100644 --- a/README.md +++ b/README.md @@ -38,8 +38,10 @@ some resource (such as a `dets` table) that doesn't handle casts. Opt = {debug, Dbgs} | {min_batch_size | max_batch_size, non_neg_integer()} | {reversed_batch, boolean()} | - {flush_mailbox_on_terminate, false | {true, SummaryCount}} + {flush_mailbox_on_terminate, false | {true, SummaryCount}} | + {batch_size_growth, exponential | {aimd, Step}} SummaryCount = non_neg_integer() + Step = pos_integer() Opts = [Opt] Opts = [term()] Result = {ok,Pid} | ignore | {error,Error} @@ -64,6 +66,14 @@ so the callback can inspect or forward the summary. Draining the mailbox prevent `proc_lib` crash reports from pretty-printing potentially large messages (e.g. binary payloads), which can cause unbounded memory growth. +The `batch_size_growth` option controls how the batch size grows when the server +is under load. The default, `exponential`, doubles the batch size each time a full +batch is completed (the original behaviour). Setting it to `{aimd, Step}` switches +to Additive Increase / Multiplicative Decrease: the batch size grows by `Step` on +each full batch and halves whenever the mailbox is found empty. AIMD produces a +smoother, more gradual ramp-up and is useful when large sudden jumps in batch size +are undesirable. + #### cast(ServerRef, Request) -> ok diff --git a/src/gen_batch_server.erl b/src/gen_batch_server.erl index cfd2cf5..34541e2 100644 --- a/src/gen_batch_server.erl +++ b/src/gen_batch_server.erl @@ -47,7 +47,8 @@ module :: module(), hibernate_after = infinity :: non_neg_integer() | infinity, reversed_batch = false :: boolean(), - flush_mailbox_on_terminate = false :: false | {true, non_neg_integer()}}). + flush_mailbox_on_terminate = false :: false | {true, non_neg_integer()}, + batch_size_growth = exponential :: exponential | {aimd, pos_integer()}}). -record(state, {batch = [] :: [op()], batch_count = 0 :: non_neg_integer(), @@ -56,8 +57,6 @@ needs_gc = false :: boolean(), debug :: list()}). -% -type state() :: #state{}. - -export_type([from/0, op/0, action/0, server_ref/0]). @@ -148,6 +147,8 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) -> ?MIN_MAX_BATCH_SIZE), ReverseBatch = proplists:get_value(reversed_batch, GBOpts, false), FlushMailbox = proplists:get_value(flush_mailbox_on_terminate, GBOpts, false), + BatchSizeGrowth = proplists:get_value(batch_size_growth, GBOpts, + exponential), Conf = #config{module = Mod, parent = Parent, name = Name, @@ -156,7 +157,8 @@ init_it(Starter, Parent, Name0, Mod, {GBOpts, Args}, Options) -> max_batch_size = MaxBatchSize, hibernate_after = HibernateAfter, reversed_batch = ReverseBatch, - flush_mailbox_on_terminate = FlushMailbox}, + flush_mailbox_on_terminate = FlushMailbox, + batch_size_growth = BatchSizeGrowth}, case init_it(Mod, Args) of {ok, {ok, Inner0}} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -329,13 +331,19 @@ enter_loop_batched(Msg, Parent, State0) -> loop_batched(append_msg(Msg, State), Parent). loop_batched(#state{config = #config{batch_size = BatchSize, - max_batch_size = Max} = Config, + max_batch_size = Max, + batch_size_growth = Growth} = Config, batch_count = BatchCount} = State0, Parent) when BatchCount >= BatchSize -> % complete batch after seeing batch_size writes State = complete_batch(State0), - % grow max batch size - NewBatchSize = min(Max, BatchSize * 2), + % grow batch size according to the configured strategy + NewBatchSize = case Growth of + exponential -> + min(Max, BatchSize * 2); + {aimd, Step} -> + min(Max, BatchSize + Step) + end, loop_wait(State#state{config = Config#config{batch_size = NewBatchSize}}, Parent); loop_batched(#state{debug = Debug} = State0, Parent) -> @@ -523,7 +531,8 @@ gen_start(undefined, Mod, Args, Opts0) -> Key == max_batch_size orelse Key == min_batch_size orelse Key == reversed_batch orelse - Key == flush_mailbox_on_terminate; + Key == flush_mailbox_on_terminate orelse + Key == batch_size_growth; (_) -> false end, Opts0), gen:start(?MODULE, link, Mod, {GBOpts, Args}, Opts); @@ -534,7 +543,8 @@ gen_start(Name, Mod, Args, Opts0) -> Key == max_batch_size orelse Key == min_batch_size orelse Key == reversed_batch orelse - Key == flush_mailbox_on_terminate; + Key == flush_mailbox_on_terminate orelse + Key == batch_size_growth; (_) -> false end, Opts0), gen:start(?MODULE, link, Name, Mod, {GBOpts, Args}, Opts). diff --git a/test/gen_batch_server_SUITE.erl b/test/gen_batch_server_SUITE.erl index 58f5946..652617a 100644 --- a/test/gen_batch_server_SUITE.erl +++ b/test/gen_batch_server_SUITE.erl @@ -49,7 +49,8 @@ all_tests() -> handle_continue_chained, opts_not_at_front, flush_mailbox_on_terminate_disabled, - flush_mailbox_on_terminate_enabled + flush_mailbox_on_terminate_enabled, + aimd_batch_size_growth ]. groups() -> @@ -603,6 +604,34 @@ opts_not_at_front(Config) -> ?assert(meck:validate(Mod)), ok. +aimd_batch_size_growth(Config) -> + Mod = ?config(mod, Config), + Self = self(), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(_) -> {ok, #{}} end), + meck:expect(Mod, handle_batch, + fun(Ops, State) -> + Self ! {batch_size, length(Ops)}, + {ok, State} + end), + %% With AIMD(step=32), growth from min=32 to max=256 takes 7 full batches; + %% exponential doubling covers the same range in only 3. Verify additive + %% growth is used by counting distinct sub-max batch sizes observed. + Opts = [{min_batch_size, 32}, + {max_batch_size, 256}, + {batch_size_growth, {aimd, 32}}], + {ok, Pid} = gen_batch_server:start_link(undefined, Mod, [], Opts), + [gen_batch_server:cast(Pid, I) || I <- lists:seq(1, 2000)], + timer:sleep(200), + Sizes = collect_batch_sizes([]), + ?assert(lists:all(fun(S) -> S =< 256 end, Sizes)), + %% AIMD yields 7 distinct sub-max steps (32,64,96,128,160,192,224); + %% exponential yields only 3 (32,64,128). Require at least 5. + SubMaxSizes = lists:usort([S || S <- Sizes, S < 256]), + ?assert(length(SubMaxSizes) >= 5), + ?assert(meck:validate(Mod)), + ok. + collect_batch_sizes(Acc) -> receive {batch_size, N} -> collect_batch_sizes([N | Acc]) From 4289a4bdb70d0d87f459d7d91f45c01e2022629f Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 31 Mar 2026 15:34:45 +0100 Subject: [PATCH 2/2] add benchmark code --- .gitignore | 1 + bench/benchmark | 126 +++++++++++++++++++++++++++++++++++++ bench/gbs_bench_worker.erl | 41 ++++++++++++ 3 files changed, 168 insertions(+) create mode 100755 bench/benchmark create mode 100644 bench/gbs_bench_worker.erl diff --git a/.gitignore b/.gitignore index a350d63..0d79682 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,6 @@ doc logs *.d ebin/ +bench/ebin/ _build rebar.lock diff --git a/bench/benchmark b/bench/benchmark new file mode 100755 index 0000000..53d761a --- /dev/null +++ b/bench/benchmark @@ -0,0 +1,126 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa _build/default/lib/gen_batch_server/ebin -pa bench/ebin + +%% Compares exponential (default) vs AIMD batch size growth strategies. +%% +%% The simulated workload models an fsync-backed log writer: each batch +%% incurs a fixed base cost (the fsync itself) plus a variable cost that +%% scales with the total bytes in the batch (the write). The worker +%% process uses off_heap message_queue_data so large binaries are not +%% copied into the process heap on receive. +%% +%% Prerequisites (from the project root): +%% +%% rebar3 compile +%% erlc -o bench/ebin bench/gbs_bench_worker.erl +%% escript bench/benchmark + +-define(NUM_MESSAGES, 100_000). +-define(PAYLOAD_SIZE, 4096). %% 4 KB per message +-define(BASE_DELAY_US, 3000). %% 3 ms fixed fsync overhead per batch +-define(US_PER_MB, 3000). %% 3 ms per MB of batch data (~330 MB/s disk) +-define(MIN_BATCH, 32). +-define(MAX_BATCH, 8192). + +main(_Args) -> + Payload = binary:copy(<<$x>>, ?PAYLOAD_SIZE), + + Sep = string:copies("-", 68), + io:format("~n~s~n", [Sep]), + io:format("gen_batch_server benchmark~n"), + io:format(" messages : ~b~n", [?NUM_MESSAGES]), + io:format(" payload : ~b B~n", [?PAYLOAD_SIZE]), + io:format(" base delay : ~b us + ~b us/MB of batch data~n", + [?BASE_DELAY_US, ?US_PER_MB]), + io:format(" min_batch : ~b | max_batch : ~b~n", + [?MIN_BATCH, ?MAX_BATCH]), + io:format("~s~n~n", [Sep]), + + run_scenario("exponential (default)", exponential, Payload), + run_scenario("AIMD step=32", {aimd, 32}, Payload), + run_scenario("AIMD step=64", {aimd, 64}, Payload), + run_scenario("AIMD step=128", {aimd, 128}, Payload), + io:format("~s~n", [Sep]). + +%% ------------------------------------------------------------------------- + +run_scenario(Label, Growth, Payload) -> + Self = self(), + Opts = [{min_batch_size, ?MIN_BATCH}, + {max_batch_size, ?MAX_BATCH}, + {batch_size_growth, Growth}], + {ok, Pid} = gen_batch_server:start_link( + undefined, gbs_bench_worker, + #{base_delay_us => ?BASE_DELAY_US, + us_per_mb => ?US_PER_MB, + payload_size => ?PAYLOAD_SIZE, + collector => Self}, + Opts), + + T0 = erlang:monotonic_time(microsecond), + + [gen_batch_server:cast(Pid, Payload) || _ <- lists:seq(1, ?NUM_MESSAGES)], + + %% Synchronous barrier: returns only after every preceding cast has + %% been handled. The reply carries final GC stats from the worker. + {ok, GcStats} = gen_batch_server:call(Pid, sync, 300_000), + + T1 = erlang:monotonic_time(microsecond), + + gen_batch_server:stop(Pid), + + {Sizes, HeapPerBatch} = drain_batch_msgs([], []), + ElapsedMs = (T1 - T0) / 1000, + + print_results(Label, ElapsedMs, Sizes, GcStats, HeapPerBatch), + + timer:sleep(500). + +drain_batch_msgs(SizeAcc, HeapAcc) -> + receive + {batch, N, HeapWords} -> + drain_batch_msgs([N | SizeAcc], [HeapWords | HeapAcc]) + after 0 -> + {lists:reverse(SizeAcc), lists:reverse(HeapAcc)} + end. + +print_results(Label, ElapsedMs, Sizes, + #{minor_gcs := MinorGcs, major_gcs := MajorGcs, + max_heap := MaxHeap}, + HeapPerBatch) -> + NumBatches = length(Sizes), + TotalMsgs = lists:sum(Sizes), + Sorted = lists:sort(Sizes), + BatchMin = hd(Sorted), + BatchMax = lists:last(Sorted), + BatchMean = TotalMsgs / max(1, NumBatches), + P50 = percentile(Sorted, 0.50), + P95 = percentile(Sorted, 0.95), + Throughput = round(?NUM_MESSAGES / (ElapsedMs / 1000)), + + HeapSorted = lists:sort(HeapPerBatch), + HeapMax = lists:last(HeapSorted), + HeapMean = lists:sum(HeapPerBatch) / max(1, length(HeapPerBatch)), + HeapP50 = percentile(HeapSorted, 0.50), + + WordSize = erlang:system_info(wordsize), + + io:format(" ~s~n", [Label]), + io:format(" elapsed : ~.1f ms~n", [ElapsedMs]), + io:format(" throughput : ~b msg/s~n", [Throughput]), + io:format(" batches : ~b (total msgs accounted: ~b)~n", + [NumBatches, TotalMsgs]), + io:format(" batch size : min=~b mean=~.1f p50=~b p95=~b max=~b~n", + [BatchMin, BatchMean, P50, P95, BatchMax]), + io:format(" GC : minor=~b major=~b~n", + [MinorGcs, MajorGcs]), + io:format(" heap peak : ~b words (~.1f KB)~n", + [MaxHeap, MaxHeap * WordSize / 1024]), + io:format(" heap/batch : mean=~b p50=~b max=~b words~n~n", + [round(HeapMean), HeapP50, HeapMax]). + +percentile(Sorted, P) -> + N = length(Sorted), + Idx = max(1, round(N * P)), + lists:nth(Idx, Sorted). diff --git a/bench/gbs_bench_worker.erl b/bench/gbs_bench_worker.erl new file mode 100644 index 0000000..c35c2b1 --- /dev/null +++ b/bench/gbs_bench_worker.erl @@ -0,0 +1,41 @@ +-module(gbs_bench_worker). +-behaviour(gen_batch_server). +-export([init/1, handle_batch/2]). + +init(Cfg) -> + process_flag(message_queue_data, off_heap), + {ok, Cfg}. + +handle_batch(Batch, #{base_delay_us := Base, + us_per_mb := UsMb, + payload_size := PSize, + collector := C} = St) -> + {heap_size, HeapBefore} = erlang:process_info(self(), heap_size), + BatchBytes = length(Batch) * PSize, + DelayUs = Base + round(BatchBytes * UsMb / (1024 * 1024)), + busy_wait_us(DelayUs), + {heap_size, HeapAfter} = erlang:process_info(self(), heap_size), + HeapWords = max(HeapBefore, HeapAfter), + {garbage_collection_info, GcProps} = + erlang:process_info(self(), garbage_collection_info), + MinorGcs = proplists:get_value(minor_gcs, GcProps, 0), + MajorGcs = proplists:get_value(major_gcs, GcProps, 0), + GcStats = #{minor_gcs => MinorGcs, + major_gcs => MajorGcs, + max_heap => HeapWords}, + C ! {batch, length(Batch), HeapWords}, + Actions = [{reply, From, {ok, GcStats}} || {call, From, _} <- Batch], + {ok, Actions, St}. + +%% Microsecond-precision busy-wait. timer:sleep/1 only accepts integer +%% milliseconds, which truncates sub-ms variable costs to zero for small +%% batches. +busy_wait_us(Us) -> + Target = erlang:monotonic_time(microsecond) + Us, + busy_wait_until(Target). + +busy_wait_until(Target) -> + case erlang:monotonic_time(microsecond) >= Target of + true -> ok; + false -> busy_wait_until(Target) + end.