From 65551d9f61956eadbfee786a45e9198cd0321535 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 31 Mar 2026 16:08:46 +0100 Subject: [PATCH 1/2] Add gen_server async request APIs (send_request, wait/receive/check_response, reqids_*) Expose the gen_server "request" family of functions so callers can make asynchronous calls to a gen_batch_server and collect replies later. All twelve new functions delegate to the gen module, matching what gen_server itself does internally. Also fix the reply path in handle_actions/2 to use gen:reply/2 instead of raw Pid ! {Tag, Msg}. This is required for alias-based From tags produced by send_request to route replies through the process alias, ensuring late replies after timeout are silently dropped. The change is backward-compatible with legacy gen:call-style From values. --- README.md | 87 ++++++++++++++++++++ src/gen_batch_server.erl | 138 ++++++++++++++++++++++++++++++-- test/gen_batch_server_SUITE.erl | 89 +++++++++++++++++++- 3 files changed, 308 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 37d539c..f0ca3fa 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,93 @@ Makes a synchronous call to the `gen_batch_server` and waits for the response pr by `Module:handle_batch/2`. The timeout is optional and defaults to 5000ms. +#### send_request(ServerRef, Request) -> ReqId + + Types: + ServerRef = pid() | {Name :: atom(), node()} | Name :: atom() + Request = term() + ReqId = request_id() + +Sends an asynchronous call request to the `gen_batch_server`. Returns a request +identifier `ReqId` that can be used with `wait_response/2`, `receive_response/2`, +or `check_response/2` to collect the reply later. The request appears as a +`{call, From, Request}` operation in `Module:handle_batch/2`. + +#### send_request(ServerRef, Request, Label, ReqIdCollection) -> NewReqIdCollection + + Types: + ServerRef = pid() | {Name :: atom(), node()} | Name :: atom() + Request = term() + Label = term() + ReqIdCollection = request_id_collection() + NewReqIdCollection = request_id_collection() + +Like `send_request/2`, but stores the request identifier in a collection +associated with `Label`. The collection can later be passed to +`receive_response/3`, `wait_response/3`, or `check_response/3`. + +#### wait_response(ReqId, WaitTime) -> Result +#### receive_response(ReqId, Timeout) -> Result + + Types: + ReqId = request_id() + WaitTime = Timeout = timeout() | {abs, integer()} + Result = {reply, Reply} | {error, {Reason, ServerRef}} | timeout + +Wait for a response to an async request made with `send_request/2`. +`receive_response` abandons the request on timeout (late replies are dropped), +while `wait_response` keeps the monitor so you can retry. + +#### wait_response(ReqIdCollection, WaitTime, Delete) -> Result +#### receive_response(ReqIdCollection, Timeout, Delete) -> Result + + Types: + ReqIdCollection = request_id_collection() + WaitTime = Timeout = timeout() | {abs, integer()} + Delete = boolean() + Result = {Response, Label, NewReqIdCollection} | no_request | timeout + Response = {reply, Reply} | {error, {Reason, ServerRef}} + +Collection variants. Returns the response along with the `Label` associated +with the request and an updated collection. + +#### check_response(Msg, ReqId) -> Result + + Types: + Msg = term() + ReqId = request_id() + Result = {reply, Reply} | {error, {Reason, ServerRef}} | no_reply + +Check whether a received message `Msg` is a response to the request `ReqId`. +Returns `no_reply` if the message does not correspond to this request. + +#### check_response(Msg, ReqIdCollection, Delete) -> Result + + Types: + Msg = term() + ReqIdCollection = request_id_collection() + Delete = boolean() + Result = {Response, Label, NewReqIdCollection} | no_request | no_reply + Response = {reply, Reply} | {error, {Reason, ServerRef}} + +Collection variant of `check_response/2`. + +#### reqids_new() -> NewReqIdCollection + +Creates a new empty request identifier collection. + +#### reqids_add(ReqId, Label, ReqIdCollection) -> NewReqIdCollection + +Stores `ReqId` with an associated `Label` in the collection. + +#### reqids_size(ReqIdCollection) -> non_neg_integer() + +Returns the number of request identifiers in the collection. + +#### reqids_to_list(ReqIdCollection) -> [{ReqId, Label}] + +Converts the collection to a list of `{ReqId, Label}` pairs. + #### Module:init(Args) -> Result Types: diff --git a/src/gen_batch_server.erl b/src/gen_batch_server.erl index 34541e2..ee078b6 100644 --- a/src/gen_batch_server.erl +++ b/src/gen_batch_server.erl @@ -15,6 +15,18 @@ cast_batch/2, call/2, call/3, + send_request/2, + send_request/4, + wait_response/2, + wait_response/3, + receive_response/2, + receive_response/3, + check_response/2, + check_response/3, + reqids_new/0, + reqids_add/3, + reqids_size/1, + reqids_to_list/1, system_continue/3, system_terminate/4, system_get_state/1, @@ -33,7 +45,16 @@ {'global', term()} | {'via', Module :: module(), Name :: term()}. --type from() :: {Pid :: pid(), Tag :: reference()}. +-type reply_tag() :: reference() + | nonempty_improper_list('alias', reference()). + +-type from() :: {Pid :: pid(), Tag :: reply_tag()}. + +-opaque request_id() :: gen:request_id(). + +-opaque request_id_collection() :: gen:request_id_collection(). + +-type response_timeout() :: timeout() | {abs, integer()}. -type op() :: {cast, UserOp :: term()} | {call, from(), UserOp :: term()} | @@ -57,8 +78,9 @@ needs_gc = false :: boolean(), debug :: list()}). --export_type([from/0, op/0, - action/0, server_ref/0]). +-export_type([from/0, reply_tag/0, op/0, + action/0, server_ref/0, + request_id/0, request_id_collection/0]). %%% Behaviour @@ -273,6 +295,111 @@ call(Name, Request, Timeout) -> exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end. +-spec send_request(ServerRef, Request) -> ReqId when + ServerRef :: server_ref(), + Request :: term(), + ReqId :: request_id(). +send_request(ServerRef, Request) -> + gen:send_request(ServerRef, '$gen_call', Request). + +-spec send_request(ServerRef, Request, Label, ReqIdCollection) -> + NewReqIdCollection when + ServerRef :: server_ref(), + Request :: term(), + Label :: term(), + ReqIdCollection :: request_id_collection(), + NewReqIdCollection :: request_id_collection(). +send_request(ServerRef, Request, Label, ReqIdCollection) -> + gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCollection). + +-spec wait_response(ReqId, WaitTime) -> Result when + ReqId :: request_id(), + WaitTime :: response_timeout(), + Response :: {reply, Reply :: term()} + | {error, {Reason :: term(), server_ref()}}, + Result :: Response | 'timeout'. +wait_response(ReqId, WaitTime) -> + gen:wait_response(ReqId, WaitTime). + +-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + WaitTime :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply :: term()} + | {error, {Reason :: term(), server_ref()}}, + Result :: {Response, Label :: term(), + NewReqIdCollection :: request_id_collection()} + | 'no_request' | 'timeout'. +wait_response(ReqIdCollection, WaitTime, Delete) -> + gen:wait_response(ReqIdCollection, WaitTime, Delete). + +-spec receive_response(ReqId, Timeout) -> Result when + ReqId :: request_id(), + Timeout :: response_timeout(), + Response :: {reply, Reply :: term()} + | {error, {Reason :: term(), server_ref()}}, + Result :: Response | 'timeout'. +receive_response(ReqId, Timeout) -> + gen:receive_response(ReqId, Timeout). + +-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + Timeout :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply :: term()} + | {error, {Reason :: term(), server_ref()}}, + Result :: {Response, Label :: term(), + NewReqIdCollection :: request_id_collection()} + | 'no_request' | 'timeout'. +receive_response(ReqIdCollection, Timeout, Delete) -> + gen:receive_response(ReqIdCollection, Timeout, Delete). + +-spec check_response(Msg, ReqId) -> Result when + Msg :: term(), + ReqId :: request_id(), + Response :: {reply, Reply :: term()} + | {error, {Reason :: term(), server_ref()}}, + Result :: Response | 'no_reply'. +check_response(Msg, ReqId) -> + gen:check_response(Msg, ReqId). + +-spec check_response(Msg, ReqIdCollection, Delete) -> Result when + Msg :: term(), + ReqIdCollection :: request_id_collection(), + Delete :: boolean(), + Response :: {reply, Reply :: term()} + | {error, {Reason :: term(), server_ref()}}, + Result :: {Response, Label :: term(), + NewReqIdCollection :: request_id_collection()} + | 'no_request' | 'no_reply'. +check_response(Msg, ReqIdCollection, Delete) -> + gen:check_response(Msg, ReqIdCollection, Delete). + +-spec reqids_new() -> NewReqIdCollection when + NewReqIdCollection :: request_id_collection(). +reqids_new() -> + gen:reqids_new(). + +-spec reqids_add(ReqId, Label, ReqIdCollection) -> + NewReqIdCollection when + ReqId :: request_id(), + Label :: term(), + ReqIdCollection :: request_id_collection(), + NewReqIdCollection :: request_id_collection(). +reqids_add(ReqId, Label, ReqIdCollection) -> + gen:reqids_add(ReqId, Label, ReqIdCollection). + +-spec reqids_size(ReqIdCollection) -> non_neg_integer() when + ReqIdCollection :: request_id_collection(). +reqids_size(ReqIdCollection) -> + gen:reqids_size(ReqIdCollection). + +-spec reqids_to_list(ReqIdCollection) -> [{ReqId, Label}] when + ReqIdCollection :: request_id_collection(), + ReqId :: request_id(), + Label :: term(). +reqids_to_list(ReqIdCollection) -> + gen:reqids_to_list(ReqIdCollection). %% Internal @@ -433,9 +560,10 @@ handle_batch_result({stop, Reason}, State0, _Debug0) -> exit(Reason). handle_actions(Actions, Debug0) -> - lists:foldl(fun ({reply, {Pid, Tag}, Msg}, + lists:foldl(fun ({reply, From, Msg}, {ShouldGc, Dbg}) -> - Pid ! {Tag, Msg}, + gen:reply(From, Msg), + {Pid, _Tag} = From, {ShouldGc, handle_debug_out(Pid, Msg, Dbg)}; (garbage_collect, {_, Dbg}) -> diff --git a/test/gen_batch_server_SUITE.erl b/test/gen_batch_server_SUITE.erl index 652617a..6cd66e5 100644 --- a/test/gen_batch_server_SUITE.erl +++ b/test/gen_batch_server_SUITE.erl @@ -50,7 +50,11 @@ all_tests() -> opts_not_at_front, flush_mailbox_on_terminate_disabled, flush_mailbox_on_terminate_enabled, - aimd_batch_size_growth + aimd_batch_size_growth, + send_request_receive_response, + send_request_wait_response, + send_request_check_response, + send_request_collection ]. groups() -> @@ -710,6 +714,89 @@ flush_mailbox_on_terminate_enabled(Config) -> ?assert(meck:validate(Mod)), ok. +send_request_receive_response(Config) -> + Mod = ?config(mod, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(Init) -> {ok, Init} end), + Args = #{}, + {ok, Pid} = gen_batch_server:start_link({local, Mod}, Mod, Args, []), + meck:expect(Mod, handle_batch, + fun([{call, From, {put, k, v}}], State) -> + {ok, [{reply, From, {ok, k}}], + maps:put(k, v, State)} + end), + ReqId = gen_batch_server:send_request(Pid, {put, k, v}), + ?assertMatch({reply, {ok, k}}, + gen_batch_server:receive_response(ReqId, 5000)), + ?assert(meck:validate(Mod)), + ok. + +send_request_wait_response(Config) -> + Mod = ?config(mod, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(Init) -> {ok, Init} end), + Args = #{}, + {ok, Pid} = gen_batch_server:start_link({local, Mod}, Mod, Args, []), + meck:expect(Mod, handle_batch, + fun([{call, From, ping}], State) -> + {ok, [{reply, From, pong}], State} + end), + ReqId = gen_batch_server:send_request(Pid, ping), + ?assertMatch({reply, pong}, + gen_batch_server:wait_response(ReqId, 5000)), + ?assert(meck:validate(Mod)), + ok. + +send_request_check_response(Config) -> + Mod = ?config(mod, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(Init) -> {ok, Init} end), + Args = #{}, + {ok, Pid} = gen_batch_server:start_link({local, Mod}, Mod, Args, []), + meck:expect(Mod, handle_batch, + fun([{call, From, hello}], State) -> + {ok, [{reply, From, world}], State} + end), + ReqId = gen_batch_server:send_request(Pid, hello), + receive + Msg -> + ?assertMatch({reply, world}, + gen_batch_server:check_response(Msg, ReqId)) + after 5000 -> + exit(timeout) + end, + ?assert(meck:validate(Mod)), + ok. + +send_request_collection(Config) -> + Mod = ?config(mod, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun(Init) -> {ok, Init} end), + Args = #{}, + {ok, Pid} = gen_batch_server:start_link({local, Mod}, Mod, Args, []), + meck:expect(Mod, handle_batch, + fun(Ops, State) -> + Actions = [{reply, From, {ok, N}} + || {call, From, {echo, N}} <- Ops], + {ok, Actions, State} + end), + Col0 = gen_batch_server:reqids_new(), + ?assertEqual(0, gen_batch_server:reqids_size(Col0)), + Col1 = gen_batch_server:send_request(Pid, {echo, 1}, first, Col0), + Col2 = gen_batch_server:send_request(Pid, {echo, 2}, second, Col1), + ?assertEqual(2, gen_batch_server:reqids_size(Col2)), + ?assertEqual(2, length(gen_batch_server:reqids_to_list(Col2))), + {Result1, Label1, Col3} = + gen_batch_server:receive_response(Col2, 5000, true), + {Result2, Label2, Col4} = + gen_batch_server:receive_response(Col3, 5000, true), + ?assertEqual(0, gen_batch_server:reqids_size(Col4)), + Collected = lists:sort([{Label1, Result1}, {Label2, Result2}]), + ?assertEqual([{first, {reply, {ok, 1}}}, {second, {reply, {ok, 2}}}], + Collected), + ?assert(meck:validate(Mod)), + ok. + %% Utility wait_batch() -> wait_batch([]). From 6be3965e24fbca88e20e012fb3fa5cffc2878804 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 1 Apr 2026 08:38:53 +0100 Subject: [PATCH 2/2] Update README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f0ca3fa..1573620 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,8 @@ The timeout is optional and defaults to 5000ms. Sends an asynchronous call request to the `gen_batch_server`. Returns a request identifier `ReqId` that can be used with `wait_response/2`, `receive_response/2`, or `check_response/2` to collect the reply later. The request appears as a -`{call, From, Request}` operation in `Module:handle_batch/2`. +`{call, From, Request}` operation in `Module:handle_batch/2`, where `From` is the +same opaque alias-based tag as described by `reply_tag()` / `from()` in the public API. #### send_request(ServerRef, Request, Label, ReqIdCollection) -> NewReqIdCollection