Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,94 @@ Makes a synchronous call to the `gen_batch_server` and waits for the response pr
by `Module:handle_batch/2`.
The timeout is optional and defaults to 5000ms.

#### send_request(ServerRef, Request) -> ReqId

Types:
ServerRef = pid() | {Name :: atom(), node()} | Name :: atom()
Request = term()
ReqId = request_id()

Sends an asynchronous call request to the `gen_batch_server`. Returns a request
identifier `ReqId` that can be used with `wait_response/2`, `receive_response/2`,
or `check_response/2` to collect the reply later. The request appears as a
`{call, From, Request}` operation in `Module:handle_batch/2`, where `From` is the
same opaque alias-based tag as described by `reply_tag()` / `from()` in the public API.

#### send_request(ServerRef, Request, Label, ReqIdCollection) -> NewReqIdCollection

Types:
ServerRef = pid() | {Name :: atom(), node()} | Name :: atom()
Request = term()
Label = term()
ReqIdCollection = request_id_collection()
NewReqIdCollection = request_id_collection()

Like `send_request/2`, but stores the request identifier in a collection
associated with `Label`. The collection can later be passed to
`receive_response/3`, `wait_response/3`, or `check_response/3`.

#### wait_response(ReqId, WaitTime) -> Result
#### receive_response(ReqId, Timeout) -> Result

Types:
ReqId = request_id()
WaitTime = Timeout = timeout() | {abs, integer()}
Result = {reply, Reply} | {error, {Reason, ServerRef}} | timeout

Wait for a response to an async request made with `send_request/2`.
`receive_response` abandons the request on timeout (late replies are dropped),
while `wait_response` keeps the monitor so you can retry.

#### wait_response(ReqIdCollection, WaitTime, Delete) -> Result
#### receive_response(ReqIdCollection, Timeout, Delete) -> Result

Types:
ReqIdCollection = request_id_collection()
WaitTime = Timeout = timeout() | {abs, integer()}
Delete = boolean()
Result = {Response, Label, NewReqIdCollection} | no_request | timeout
Response = {reply, Reply} | {error, {Reason, ServerRef}}

Collection variants. Returns the response along with the `Label` associated
with the request and an updated collection.

#### check_response(Msg, ReqId) -> Result

Types:
Msg = term()
ReqId = request_id()
Result = {reply, Reply} | {error, {Reason, ServerRef}} | no_reply

Check whether a received message `Msg` is a response to the request `ReqId`.
Returns `no_reply` if the message does not correspond to this request.

#### check_response(Msg, ReqIdCollection, Delete) -> Result

Types:
Msg = term()
ReqIdCollection = request_id_collection()
Delete = boolean()
Result = {Response, Label, NewReqIdCollection} | no_request | no_reply
Response = {reply, Reply} | {error, {Reason, ServerRef}}

Collection variant of `check_response/2`.

#### reqids_new() -> NewReqIdCollection

Creates a new empty request identifier collection.

#### reqids_add(ReqId, Label, ReqIdCollection) -> NewReqIdCollection

Stores `ReqId` with an associated `Label` in the collection.

#### reqids_size(ReqIdCollection) -> non_neg_integer()

Returns the number of request identifiers in the collection.

#### reqids_to_list(ReqIdCollection) -> [{ReqId, Label}]

Converts the collection to a list of `{ReqId, Label}` pairs.

#### Module:init(Args) -> Result

Types:
Expand Down
138 changes: 133 additions & 5 deletions src/gen_batch_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
cast_batch/2,
call/2,
call/3,
send_request/2,
send_request/4,
wait_response/2,
wait_response/3,
receive_response/2,
receive_response/3,
check_response/2,
check_response/3,
reqids_new/0,
reqids_add/3,
reqids_size/1,
reqids_to_list/1,
system_continue/3,
system_terminate/4,
system_get_state/1,
Expand All @@ -33,7 +45,16 @@
{'global', term()} |
{'via', Module :: module(), Name :: term()}.

-type from() :: {Pid :: pid(), Tag :: reference()}.
-type reply_tag() :: reference()
| nonempty_improper_list('alias', reference()).

-type from() :: {Pid :: pid(), Tag :: reply_tag()}.

-opaque request_id() :: gen:request_id().

-opaque request_id_collection() :: gen:request_id_collection().

-type response_timeout() :: timeout() | {abs, integer()}.

-type op() :: {cast, UserOp :: term()} |
{call, from(), UserOp :: term()} |
Expand All @@ -57,8 +78,9 @@
needs_gc = false :: boolean(),
debug :: list()}).

-export_type([from/0, op/0,
action/0, server_ref/0]).
-export_type([from/0, reply_tag/0, op/0,
action/0, server_ref/0,
request_id/0, request_id_collection/0]).

%%% Behaviour

Expand Down Expand Up @@ -273,6 +295,111 @@ call(Name, Request, Timeout) ->
exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
end.

-spec send_request(ServerRef, Request) -> ReqId when
ServerRef :: server_ref(),
Request :: term(),
ReqId :: request_id().
send_request(ServerRef, Request) ->
gen:send_request(ServerRef, '$gen_call', Request).

-spec send_request(ServerRef, Request, Label, ReqIdCollection) ->
NewReqIdCollection when
ServerRef :: server_ref(),
Request :: term(),
Label :: term(),
ReqIdCollection :: request_id_collection(),
NewReqIdCollection :: request_id_collection().
send_request(ServerRef, Request, Label, ReqIdCollection) ->
gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCollection).

-spec wait_response(ReqId, WaitTime) -> Result when
ReqId :: request_id(),
WaitTime :: response_timeout(),
Response :: {reply, Reply :: term()}
| {error, {Reason :: term(), server_ref()}},
Result :: Response | 'timeout'.
wait_response(ReqId, WaitTime) ->
gen:wait_response(ReqId, WaitTime).

-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
WaitTime :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply :: term()}
| {error, {Reason :: term(), server_ref()}},
Result :: {Response, Label :: term(),
NewReqIdCollection :: request_id_collection()}
| 'no_request' | 'timeout'.
wait_response(ReqIdCollection, WaitTime, Delete) ->
gen:wait_response(ReqIdCollection, WaitTime, Delete).

-spec receive_response(ReqId, Timeout) -> Result when
ReqId :: request_id(),
Timeout :: response_timeout(),
Response :: {reply, Reply :: term()}
| {error, {Reason :: term(), server_ref()}},
Result :: Response | 'timeout'.
receive_response(ReqId, Timeout) ->
gen:receive_response(ReqId, Timeout).

-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when
ReqIdCollection :: request_id_collection(),
Timeout :: response_timeout(),
Delete :: boolean(),
Response :: {reply, Reply :: term()}
| {error, {Reason :: term(), server_ref()}},
Result :: {Response, Label :: term(),
NewReqIdCollection :: request_id_collection()}
| 'no_request' | 'timeout'.
receive_response(ReqIdCollection, Timeout, Delete) ->
gen:receive_response(ReqIdCollection, Timeout, Delete).

-spec check_response(Msg, ReqId) -> Result when
Msg :: term(),
ReqId :: request_id(),
Response :: {reply, Reply :: term()}
| {error, {Reason :: term(), server_ref()}},
Result :: Response | 'no_reply'.
check_response(Msg, ReqId) ->
gen:check_response(Msg, ReqId).

-spec check_response(Msg, ReqIdCollection, Delete) -> Result when
Msg :: term(),
ReqIdCollection :: request_id_collection(),
Delete :: boolean(),
Response :: {reply, Reply :: term()}
| {error, {Reason :: term(), server_ref()}},
Result :: {Response, Label :: term(),
NewReqIdCollection :: request_id_collection()}
| 'no_request' | 'no_reply'.
check_response(Msg, ReqIdCollection, Delete) ->
gen:check_response(Msg, ReqIdCollection, Delete).

-spec reqids_new() -> NewReqIdCollection when
NewReqIdCollection :: request_id_collection().
reqids_new() ->
gen:reqids_new().

-spec reqids_add(ReqId, Label, ReqIdCollection) ->
NewReqIdCollection when
ReqId :: request_id(),
Label :: term(),
ReqIdCollection :: request_id_collection(),
NewReqIdCollection :: request_id_collection().
reqids_add(ReqId, Label, ReqIdCollection) ->
gen:reqids_add(ReqId, Label, ReqIdCollection).

-spec reqids_size(ReqIdCollection) -> non_neg_integer() when
ReqIdCollection :: request_id_collection().
reqids_size(ReqIdCollection) ->
gen:reqids_size(ReqIdCollection).

-spec reqids_to_list(ReqIdCollection) -> [{ReqId, Label}] when
ReqIdCollection :: request_id_collection(),
ReqId :: request_id(),
Label :: term().
reqids_to_list(ReqIdCollection) ->
gen:reqids_to_list(ReqIdCollection).

%% Internal

Expand Down Expand Up @@ -433,9 +560,10 @@ handle_batch_result({stop, Reason}, State0, _Debug0) ->
exit(Reason).

handle_actions(Actions, Debug0) ->
lists:foldl(fun ({reply, {Pid, Tag}, Msg},
lists:foldl(fun ({reply, From, Msg},
{ShouldGc, Dbg}) ->
Pid ! {Tag, Msg},
gen:reply(From, Msg),
{Pid, _Tag} = From,
{ShouldGc,
handle_debug_out(Pid, Msg, Dbg)};
(garbage_collect, {_, Dbg}) ->
Expand Down
89 changes: 88 additions & 1 deletion test/gen_batch_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ all_tests() ->
opts_not_at_front,
flush_mailbox_on_terminate_disabled,
flush_mailbox_on_terminate_enabled,
aimd_batch_size_growth
aimd_batch_size_growth,
send_request_receive_response,
send_request_wait_response,
send_request_check_response,
send_request_collection
].

groups() ->
Expand Down Expand Up @@ -710,6 +714,89 @@ flush_mailbox_on_terminate_enabled(Config) ->
?assert(meck:validate(Mod)),
ok.

send_request_receive_response(Config) ->
Mod = ?config(mod, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun(Init) -> {ok, Init} end),
Args = #{},
{ok, Pid} = gen_batch_server:start_link({local, Mod}, Mod, Args, []),
meck:expect(Mod, handle_batch,
fun([{call, From, {put, k, v}}], State) ->
{ok, [{reply, From, {ok, k}}],
maps:put(k, v, State)}
end),
ReqId = gen_batch_server:send_request(Pid, {put, k, v}),
?assertMatch({reply, {ok, k}},
gen_batch_server:receive_response(ReqId, 5000)),
?assert(meck:validate(Mod)),
ok.

send_request_wait_response(Config) ->
Mod = ?config(mod, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun(Init) -> {ok, Init} end),
Args = #{},
{ok, Pid} = gen_batch_server:start_link({local, Mod}, Mod, Args, []),
meck:expect(Mod, handle_batch,
fun([{call, From, ping}], State) ->
{ok, [{reply, From, pong}], State}
end),
ReqId = gen_batch_server:send_request(Pid, ping),
?assertMatch({reply, pong},
gen_batch_server:wait_response(ReqId, 5000)),
?assert(meck:validate(Mod)),
ok.

send_request_check_response(Config) ->
Mod = ?config(mod, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun(Init) -> {ok, Init} end),
Args = #{},
{ok, Pid} = gen_batch_server:start_link({local, Mod}, Mod, Args, []),
meck:expect(Mod, handle_batch,
fun([{call, From, hello}], State) ->
{ok, [{reply, From, world}], State}
end),
ReqId = gen_batch_server:send_request(Pid, hello),
receive
Msg ->
?assertMatch({reply, world},
gen_batch_server:check_response(Msg, ReqId))
after 5000 ->
exit(timeout)
end,
?assert(meck:validate(Mod)),
ok.

send_request_collection(Config) ->
Mod = ?config(mod, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun(Init) -> {ok, Init} end),
Args = #{},
{ok, Pid} = gen_batch_server:start_link({local, Mod}, Mod, Args, []),
meck:expect(Mod, handle_batch,
fun(Ops, State) ->
Actions = [{reply, From, {ok, N}}
|| {call, From, {echo, N}} <- Ops],
{ok, Actions, State}
end),
Col0 = gen_batch_server:reqids_new(),
?assertEqual(0, gen_batch_server:reqids_size(Col0)),
Col1 = gen_batch_server:send_request(Pid, {echo, 1}, first, Col0),
Col2 = gen_batch_server:send_request(Pid, {echo, 2}, second, Col1),
?assertEqual(2, gen_batch_server:reqids_size(Col2)),
?assertEqual(2, length(gen_batch_server:reqids_to_list(Col2))),
{Result1, Label1, Col3} =
gen_batch_server:receive_response(Col2, 5000, true),
{Result2, Label2, Col4} =
gen_batch_server:receive_response(Col3, 5000, true),
?assertEqual(0, gen_batch_server:reqids_size(Col4)),
Collected = lists:sort([{Label1, Result1}, {Label2, Result2}]),
?assertEqual([{first, {reply, {ok, 1}}}, {second, {reply, {ok, 2}}}],
Collected),
?assert(meck:validate(Mod)),
ok.

%% Utility
wait_batch() ->
wait_batch([]).
Expand Down
Loading