From b7fc3c0a06a03d8b43d3dfca98ed43223a54ce76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Fri, 27 Feb 2026 01:41:54 +0100 Subject: [PATCH 1/4] Merge ered_client and ered_connection (#157) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Get rid of the two processes in ered_connection and duplicated logic. * Use active mode. * Response timeout handling is existing but incomplete. * Batching is not fixed. Only sends one command at a time. * Sending may block. The client process is unresponsive when it happens. Batching and non-blocking send are to be fixed in follow-up PRs. --------- Signed-off-by: Viktor Söderqvist --- src/ered_client.erl | 570 +++++++++++++++++++++++++------- src/ered_connection.erl | 384 --------------------- src/ered_parser.erl | 42 ++- test/ered_client_tests.erl | 62 ++-- test/ered_cluster_SUITE.erl | 19 +- test/ered_cluster_tls_SUITE.erl | 8 +- test/ered_connection_tests.erl | 86 ----- test/ered_parser_tests.erl | 11 +- 8 files changed, 533 insertions(+), 649 deletions(-) delete mode 100644 src/ered_connection.erl delete mode 100644 test/ered_connection_tests.erl diff --git a/src/ered_client.erl b/src/ered_client.erl index cfdce31..903deeb 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -34,7 +34,15 @@ { host :: host(), port :: inet:port_number(), - connection_opts = [] :: [ered_connection:opt()], + + %% From "connection opts" + batch_size = 16 :: non_neg_integer(), + transport = gen_tcp :: gen_tcp | ssl, + transport_opts = [] :: list(), + connect_timeout = infinity :: timeout(), + push_cb = fun(_) -> ok end :: push_cb(), + timeout = 10000 :: timeout(), % Response timeout + resp_version = 3 :: 2..3, use_cluster_id = false :: boolean(), auth = none :: {binary(), binary()} | none, @@ -51,10 +59,11 @@ -record(st, { - connect_loop_pid = none, - connection_pid = none, + connection_loop_pid = none, + socket = none, controlling_process :: pid(), last_status = none, + parser_state :: ered_parser:state(), waiting = q_new() :: command_queue(), pending = q_new() :: command_queue(), @@ -62,19 +71,20 @@ cluster_id = undefined :: undefined | binary(), queue_full_event_sent = false :: boolean(), % set to true when full, false when reaching queue_ok_level - node_status = up :: up | node_down | node_deactivated, - + status :: init | up | node_down | node_deactivated, node_down_timer = none :: none | reference(), + connected_at = none :: none | integer(), % erlang:monotonic_time(millisecond) opts = #opts{} }). - -type command_error() :: queue_overflow | node_down | node_deactivated | {client_stopped, reason()}. -type command_item() :: {command, ered_command:redis_command(), reply_fun()}. -type command_queue() :: {Size :: non_neg_integer(), queue:queue(command_item())}. --type reply() :: {ok, ered_connection:result()} | {error, command_error()}. +-type result() :: ered_parser:parse_result(). +-type push_cb() :: fun((result()) -> any()). +-type reply() :: {ok, result()} | {error, command_error()}. -type reply_fun() :: fun((reply()) -> any()). -type host() :: ered:host(). @@ -106,7 +116,7 @@ -type opt() :: %% Options passed to the connection module - {connection_opts, [ered_connection:opt()]} | + {connection_opts, [connection_opt()]} | %% Max number of commands allowed to wait in queue. {max_waiting, non_neg_integer()} | %% Max number of commands to be pending, i.e. sent to client @@ -135,6 +145,45 @@ %% The SELECT command is only sent when non-zero. {select_db, non_neg_integer()}. +-type connection_opt() :: + %% If commands are queued up in the process message queue this is the max + %% amount of messages that will be received and sent in one call + {batch_size, non_neg_integer()} | + %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. + {connect_timeout, timeout()} | + %% Options passed to gen_tcp:connect/4. + {tcp_options, [gen_tcp:connect_option()]} | + %% Timeout passed to gen_tcp:connect/4. DEPRECATED. + {tcp_connect_timeout, timeout()} | + %% Options passed to ssl:connect/4. If this config parameter is present, + %% TLS is used. + {tls_options, [ssl:tls_client_option()]} | + %% Timeout passed to ssl:connect/4. DEPRECATED. + {tls_connect_timeout, timeout()} | + %% Callback for push notifications + {push_cb, push_cb()} | + %% Timeout when waiting for a response from Redis. milliseconds + {response_timeout, non_neg_integer()}. + +%% Command in the waiting queue. +-record(command, {data, replyto}). + +%% Pending request, in flight, sent to server, waiting for reply/ies. +-record(pending_req, + { + command :: #command{}, + response_class :: ered_command:response_class() | + [ered_command:response_class()], + reply_acc = [] + }). + +%% Queue macro, can be used in guards. +-define(q_is_empty(Q), (element(1, Q) =:= 0)). + +%% Commands like SUBSCRIBE and UNSUBSCRIBE don't return anything, so we use this +%% return value. +-define(pubsub_reply, undefined). + %%%=================================================================== %%% API %%%=================================================================== @@ -220,14 +269,15 @@ command(ServerRef, Command, Timeout) -> %% client process and should not hang or perform any lengthy task. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, CallbackFun) -> - gen_server:cast(ServerRef, {command, ered_command:convert_to(Command), CallbackFun}). + gen_server:cast(ServerRef, #command{data = ered_command:convert_to(Command), + replyto = CallbackFun}). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== init({Host, Port, OptsList, User}) -> Opts = lists:foldl( - fun({connection_opts, Val}, S) -> S#opts{connection_opts = Val}; + fun({connection_opts, Val}, S) -> handle_connection_opts(S, Val); ({max_waiting, Val}, S) -> S#opts{max_waiting = Val}; ({max_pending, Val}, S) -> S#opts{max_pending = Val}; ({queue_ok_level, Val}, S) -> S#opts{queue_ok_level = Val}; @@ -244,21 +294,53 @@ init({Host, Port, OptsList, User}) -> OptsList), monitor(process, User), process_flag(trap_exit, true), - Pid = self(), - ConnectPid = spawn_link(fun() -> connect(Pid, Opts) end), - {ok, start_node_down_timer(#st{opts = Opts, - controlling_process = User, - connect_loop_pid = ConnectPid})}. + State0 = #st{opts = Opts, + controlling_process = User, + parser_state = ered_parser:init(), + status = init}, + State1 = start_connect_loop(now, State0), + {ok, start_node_down_timer(State1)}. + +%% "Connection opts" is second layer of options. +%% +%% TODO: Remove this layering and put them directly as client options. It's an +%% API change so we'll do it in an appropriate version. +handle_connection_opts(OptsRecord, Opts) -> + Valid = [batch_size, tcp_options, tls_options, push_cb, response_timeout, + tcp_connect_timeout, tls_connect_timeout, connect_timeout], + [error({badarg, BadOpt}) + || BadOpt <- proplists:get_keys(Opts) -- Valid], + BatchSize = proplists:get_value(batch_size, Opts, 16), + ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), + PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), + TcpOptions = proplists:get_value(tcp_options, Opts, []), + TlsOptions = proplists:get_value(tls_options, Opts, []), + TcpTimeout = proplists:get_value(tcp_connect_timeout, Opts, infinity), + TlsTimeout = proplists:get_value(tls_connect_timeout, Opts, infinity), + {Transport, Options, Timeout0} = case TlsOptions of + [] -> + {gen_tcp, TcpOptions, TcpTimeout}; + _ -> + {ssl, TlsOptions, TlsTimeout} + end, + ConnectTimeout = proplists:get_value(connect_timeout, Opts, Timeout0), + OptsRecord#opts{batch_size = BatchSize, + transport = Transport, transport_opts = Options, + connect_timeout = ConnectTimeout, + timeout = ResponseTimeout, + push_cb = PushCb}. handle_call({command, Command}, From, State) -> Fun = fun(Reply) -> gen_server:reply(From, Reply) end, - handle_cast({command, Command, Fun}, State). + handle_cast(#command{data = Command, replyto = Fun}, State). -handle_cast(Command = {command, _, _}, State) -> - case State#st.node_status of - up -> - {noreply, process_commands(State#st{waiting = q_in(Command, State#st.waiting)})}; +handle_cast(Command = #command{}, State) -> + case State#st.status of + Up when Up =:= up; Up =:= init -> + State1 = State#st{waiting = q_in(Command, State#st.waiting)}, + State2 = process_commands(State1), + {noreply, State2, response_timeout(State2)}; NodeProblem when NodeProblem =:= node_down; NodeProblem =:= node_deactivated -> reply_command(Command, {error, NodeProblem}), {noreply, State} @@ -268,66 +350,123 @@ handle_cast(deactivate, State) -> State1 = cancel_node_down_timer(State), State2 = report_connection_status(node_deactivated, State1), State3 = reply_all({error, node_deactivated}, State2), - {noreply, process_commands(State3#st{node_status = node_deactivated})}; + {noreply, process_commands(State3#st{status = node_deactivated})}; -handle_cast(reactivate, #st{connection_pid = none} = State) -> +handle_cast(reactivate, #st{socket = none} = State) -> {noreply, start_node_down_timer(State)}; handle_cast(reactivate, State) -> - {noreply, State#st{node_status = up}}. - + {noreply, State#st{status = up}}. -handle_info({{command_reply, Pid}, Reply}, State = #st{pending = Pending, connection_pid = Pid}) -> - case q_out(Pending) of - empty -> - {noreply, State}; - {Command, NewPending} -> - reply_command(Command, {ok, Reply}), - {noreply, process_commands(State#st{pending = NewPending})} +handle_info({Type, Socket, Data}, #st{socket = Socket} = State) + when Type =:= tcp; Type =:= ssl -> + %% Receive data from current socket. + State1 = handle_data(Data, State), + State2 = process_commands(State1), + {noreply, State2, response_timeout(State2)}; + +handle_info({Passive, Socket}, #st{socket = Socket} = State) + when Passive =:= tcp_passive; Passive =:= ssl_passive -> + %% Socket switched to passive mode due to {active, N}. + %% TODO: Add config for N. + N = 100, + case setopts(State, [{active, N}]) of + ok -> + {noreply, State, response_timeout(State)}; + {error, Reason} -> + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, Reason}, State#st{socket = undefined})} end; -handle_info({{command_reply, _Pid}, _Reply}, State) -> - %% Stray message from a defunct client? ignore! - {noreply, State}; - -handle_info(Reason = {connect_error, _ErrorReason}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info(Reason = {init_error, _Errors}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info(Reason = {socket_closed, _CloseReason}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info({connected, Pid, ClusterId}, State) -> - State1 = cancel_node_down_timer(State), - State2 = State1#st{connection_pid = Pid, cluster_id = ClusterId, - node_status = case State1#st.node_status of - node_down -> up; - OldStatus -> OldStatus - end}, - State3 = report_connection_status(connection_up, State2), - {noreply, process_commands(State3)}; +handle_info({Error, Socket, Reason}, #st{socket = Socket} = State) + when Error =:= tcp_error; Error =:= ssl_error -> + %% Socket errors. If the network or peer is down, the error is not + %% always followed by a tcp_closed. + %% + %% TLS 1.3: Called after a connect when the client certificate has expired + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, Reason}, State#st{socket = none})}; + +handle_info({Closed, Socket}, #st{socket = Socket} = State) + when Closed =:= tcp_closed; Closed =:= ssl_closed -> + %% Socket got closed by the server. + {noreply, connection_down({socket_closed, Closed}, State#st{socket = none})}; + +handle_info(ConnectError = {connect_error, _Reason}, State) -> + %% Message from the connect loop process. It will retry. + {noreply, connection_down(ConnectError, State)}; + +handle_info({connected, Socket}, State) -> + %% Sent from connect loop process when just before it exits. + State1 = abort_pending_commands(State), + State2 = State1#st{socket = Socket, + connected_at = erlang:monotonic_time(millisecond), + status = init}, + State3 = init_connection(State2), + {noreply, State3, response_timeout(State3)}; + +handle_info({init_command_reply, {ok, Replies}}, State) -> + case [Reason || {error, Reason} <- Replies] of + [] -> + %% No errors + ClusterId = case State#st.opts#opts.use_cluster_id of + true -> + hd(Replies); + false -> + undefined + end, + State1 = cancel_node_down_timer(State), + NodeStatus = case State1#st.status of + node_down -> up; + init -> up; + OldStatus -> OldStatus + end, + State2 = State1#st{status = NodeStatus, cluster_id = ClusterId}, + State3 = report_connection_status(connection_up, State2), + {noreply, process_commands(State3), response_timeout(State3)}; + Errors -> + {noreply, connection_down({init_error, Errors}, State)} + end; +handle_info({init_command_reply, {error, Reason}}, State) -> + {noreply, connection_down({init_error, Reason}, State)}; handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.node_down_timer -> + %% Node down timeout State1 = report_connection_status({connection_down, node_down_timeout}, State), State2 = reply_all({error, node_down}, State1), - {noreply, process_commands(State2#st{node_status = node_down})}; + {noreply, process_commands(State2#st{status = node_down})}; -handle_info({timeout, _TimerRef, _Msg}, State) -> - {noreply, State}; +handle_info(timeout, #st{socket = Socket} = State) when Socket =/= none -> + %% Request timeout + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, timeout}, State#st{socket = none})}; handle_info({'DOWN', _Mon, process, Pid, ExitReason}, State = #st{controlling_process = Pid}) -> {stop, ExitReason, State}; +handle_info({'EXIT', Pid, normal}, #st{connection_loop_pid = Pid} = State) -> + State1 = State#st{connection_loop_pid = none}, + State2 = case State1#st.socket of + none -> + %% Corner case. The new connection was lost before this + %% exit signal arrived. Start reconnect loop again. + start_connect_loop(now, State1); + _Socket -> + State1 + end, + {noreply, State2, response_timeout(State2)}; + handle_info({'EXIT', _From, Reason}, State) -> + %% Supervisor exited. {stop, Reason, State}; handle_info(_Ignore, State) -> - {noreply, State}. + {noreply, State, response_timeout(State)}. terminate(Reason, State) -> - exit(State#st.connect_loop_pid, kill), reply_all({error, {client_stopped, Reason}}, State), report_connection_status({connection_down, {client_stopped, Reason}}, State), ok. @@ -338,8 +477,128 @@ code_change(_OldVsn, State = #st{opts = #opts{}}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +setopts(#st{opts = #opts{transport = gen_tcp}, socket = Socket}, Opts) -> + inet:setopts(Socket, Opts); +setopts(#st{opts = #opts{transport = ssl}, socket = Socket}, Opts) -> + ssl:setopts(Socket, Opts). + +%% Data received from the server +handle_data(Data, #st{parser_state = ParserState} = State) -> + handle_parser_result(ered_parser:continue(Data, ParserState), State). + +handle_parser_result({need_more, _BytesNeeded, ParserState}, State) -> + State#st{parser_state = ParserState}; +handle_parser_result({done, Value, ParserState}, State0) -> + State1 = handle_result(Value, State0), + handle_parser_result(ered_parser:next(ParserState), State1); +handle_parser_result({parse_error, Reason}, State) -> + Transport = State#st.opts#opts.transport, + Transport:close(State#st.socket), + connection_down({socket_closed, {parse_error, Reason}}, State#st{socket = none}). + +handle_result({push, Value = [Type|_]}, State) -> + %% Pub/sub in RESP3 is a bit quirky. The push is supposed to be out of band + %% data not connected to any request but for subscribe and unsubscribe + %% requests, a successful command is signalled as one or more push messages. + PushCB = State#st.opts#opts.push_cb, + PushCB(Value), + State1 = case is_subscribe_push(Type) of + true -> + handle_subscribe_push(Value, State); + false -> + State + end, + State1; +handle_result(Value, #st{pending = PendingQueue} = State) + when not ?q_is_empty(PendingQueue) -> + {PendingReq, PendingQueue1} = q_out(PendingQueue), + #pending_req{command = Command, + response_class = RespClass, + reply_acc = Acc} = PendingReq, + %% Check how many replies expected (list = pipeline) + case RespClass of + Single when not is_list(Single) -> + reply_command(Command, {ok, Value}), + State#st{pending = PendingQueue1}; + [_] -> + %% Last one, send the reply + reply_command(Command, {ok, lists:reverse([Value | Acc])}), + State#st{pending = PendingQueue1}; + [_ | TailClasses] -> + %% Need more replies. Save the reply and keep going. + PendingReq1 = PendingReq#pending_req{response_class = TailClasses, + reply_acc = [Value | Acc]}, + PendingQueue2 = q_in_r(PendingReq1, PendingQueue1), + State#st{pending = PendingQueue2} + end; +handle_result(_Value, #st{pending = PendingQueue}) + when ?q_is_empty(PendingQueue) -> + error(unexpected_reply). + +is_subscribe_push(<<"subscribe">>) -> + true; +is_subscribe_push(<>) when X >= $a, X =< $z -> + true; +is_subscribe_push(<<"unsubscribe">>) -> + true; +is_subscribe_push(<>) when X >= $a, X =< $z -> + true; +is_subscribe_push(_) -> + false. + +handle_subscribe_push(PushMessage, #st{pending = PendingQueue} = State) -> + case q_out(PendingQueue) of + {PendingReq, PendingQueue1} -> + State1 = State#st{pending = PendingQueue1}, + handle_subscribed_popped_pending(PushMessage, PendingReq, State1); + empty -> + %% No commands pending. It's may be a server initiated unsubscribe. + State + end. + +handle_subscribed_popped_pending(Push, + #pending_req{command = Command, + response_class = ExpectClass, + reply_acc = Acc} = Req, + State) -> + case {ExpectClass, hd(Push)} of + {{Type, N}, Type} % simple command + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + reply_command(Command, {ok, ?pubsub_reply}), + State; + {{Type, N}, Type} % simple command + when N > 1 -> % not yet subscribed all channels + Req1 = Req#pending_req{response_class = {Type, N - 1}}, + pending_in_r(Req1, State); + {[{Type, N}], Type} % last command in pipeline + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + reply_command(Command, {ok, lists:reverse([?pubsub_reply | Acc])}), + State; + {[{Type, N} | Classes], Type} % pipeline, not the last command + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + Req1 = Req#pending_req{response_class = Classes, + reply_acc = [?pubsub_reply | Acc]}, + pending_in_r(Req1, State); + {[{Type, N} | Classes], Type} % pipeline + when N > 1 -> % not yet subscribed all channels + Req1 = Req#pending_req{response_class = [{Type, N - 1} | Classes]}, + pending_in_r(Req1, State); + _Otherwise -> + %% Not expecting this particular push message for Req. Put it back in queue. + pending_in_r(Req, State) + end. + +%% Add in the front of the pending queue (like queue:in_r). +pending_in_r(ReplyInfo, #st{pending = Pending0} = State) -> + Pending1 = q_in_r(ReplyInfo, Pending0), + State#st{pending = Pending1}. + reply_all(Reply, State) -> - [reply_command(Command, Reply) || Command <- q_to_list(State#st.pending)], + [reply_command(Req#pending_req.command, Reply) || Req <- q_to_list(State#st.pending)], [reply_command(Command, Reply) || Command <- q_to_list(State#st.waiting)], State#st{waiting = q_new(), pending = q_new()}. @@ -355,41 +614,83 @@ cancel_node_down_timer(#st{node_down_timer = TimerRef} = State) -> erlang:cancel_timer(TimerRef), State#st{node_down_timer = none}. +%% Move pending commands back to the waiting queue. Discard partial replies. +%% +%% This is risky behavior. If some of the commands sent are not idempotent, we +%% can't just reconnect and send them again. We may just want to return an error +%% instead. +abort_pending_commands(State) -> + PendingReqs = [Req#pending_req.command || Req <- q_to_list(State#st.pending)], + State#st{waiting = q_join(q_from_list(PendingReqs), State#st.waiting), + pending = q_new(), + parser_state = ered_parser:init()}. + connection_down(Reason, State) -> - State1 = State#st{waiting = q_join(State#st.pending, State#st.waiting), - pending = q_new(), - connection_pid = none}, + State1 = abort_pending_commands(State), State2 = process_commands(State1), State3 = report_connection_status({connection_down, Reason}, State2), - start_node_down_timer(State3). - + State4 = start_connect_loop(now, State3), + start_node_down_timer(State4). -%%%%%% +-spec process_commands(#st{}) -> #st{}. process_commands(State) -> NumWaiting = q_len(State#st.waiting), NumPending = q_len(State#st.pending), if - (NumWaiting > 0) and (NumPending < State#st.opts#opts.max_pending) and (State#st.connection_pid /= none) -> + State#st.status =:= up, State#st.socket =/= none, + NumWaiting > 0, NumPending < State#st.opts#opts.max_pending -> + %% TODO: Pop multiple from queue and send them in a batch. Use the batch_size option. + %% Use q_split, q_join and q_to_list. + %% TODO: Add request timeout timestamp to PendingReq. {Command, NewWaiting} = q_out(State#st.waiting), - Data = get_command_payload(Command), - ered_connection:command_async(State#st.connection_pid, Data, {command_reply, State#st.connection_pid}), - process_commands(State#st{pending = q_in(Command, State#st.pending), - waiting = NewWaiting}); - - (NumWaiting > State#st.opts#opts.max_waiting) and (State#st.queue_full_event_sent) -> + RespCommand = Command#command.data, + Data = ered_command:get_data(RespCommand), + Class = ered_command:get_response_class(RespCommand), + Transport = State#st.opts#opts.transport, + case Transport:send(State#st.socket, Data) of + ok -> + PendingReq = #pending_req{command = Command, + response_class = Class}, + State1 = State#st{pending = q_in(PendingReq, State#st.pending), + waiting = NewWaiting}, + process_commands(State1); + {error, _Reason} -> + %% Send FIN and handle replies in fligh before reconnecting. + Transport:shutdown(State#st.socket, read_write), + start_connect_loop(now, State#st{status = init}) + end; + + NumWaiting > State#st.opts#opts.max_waiting, State#st.queue_full_event_sent -> drop_commands(State); NumWaiting > State#st.opts#opts.max_waiting -> drop_commands( report_connection_status(queue_full, State#st{queue_full_event_sent = true})); - (NumWaiting < State#st.opts#opts.queue_ok_level) and (State#st.queue_full_event_sent) -> + NumWaiting < State#st.opts#opts.queue_ok_level, State#st.queue_full_event_sent -> report_connection_status(queue_ok, State#st{queue_full_event_sent = false}); true -> State end. +start_connect_loop(_When, State) when is_pid(State#st.connection_loop_pid) -> + State; +start_connect_loop(When0, State) -> + Self = self(), + Now = erlang:monotonic_time(millisecond), + ConnectedAt = State#st.connected_at, + %% Don't reconnect immediately if the last connect was too recently. + When = if + is_integer(ConnectedAt), + Now - ConnectedAt < State#st.opts#opts.reconnect_wait -> + wait; + true -> + When0 + end, + ConnectPid = spawn_link(fun () -> connect_loop(When, Self, State#st.opts) end), + State#st{connection_loop_pid = ConnectPid}. + drop_commands(State) -> case q_len(State#st.waiting) > State#st.opts#opts.max_waiting of true -> @@ -407,6 +708,9 @@ q_new() -> q_in(Item, {Size, Q}) -> {Size+1, queue:in(Item, Q)}. +q_in_r(Item, {Size, Q}) -> + {Size + 1, queue:in_r(Item, Q)}. + q_join({Size1, Q1}, {Size2, Q2}) -> {Size1 + Size2, queue:join(Q1, Q2)}. @@ -416,22 +720,41 @@ q_out({Size, Q}) -> {{value, Val}, NewQ} -> {Val, {Size-1, NewQ}} end. +%% q_split(N, {Size, Q}) when N =< Size -> +%% {A, B} = queue:split(N, Q), +%% {{N, A}, {Size - N, B}}. + q_to_list({_Size, Q}) -> queue:to_list(Q). +q_from_list(List) -> + {length(List), queue:from_list(List)}. + q_len({Size, _Q}) -> Size. +response_timeout(State) when not ?q_is_empty(State#st.pending) -> + %% FIXME: Store req timeout in each pending item + State#st.opts#opts.timeout; +response_timeout(_State) -> + infinity. -reply_command({command, _, Fun}, Reply) -> +reply_command(#command{replyto = Fun} = _Command, Reply) -> Fun(Reply). -get_command_payload({command, Command, _Fun}) -> - Command. - -spec report_connection_status(status(), #st{}) -> #st{}. report_connection_status(Status, State = #st{last_status = Status}) -> State; +report_connection_status({connection_down, {init_error, node_down}}, + #st{last_status = {connection_down, _}} = State) -> + %% Silence additional init error cased by connection down. The lost + %% connection was already reported in another status message. + State; +report_connection_status({connection_down, {init_error, InitReason}}, + #st{last_status = node_deactivated} = State) + when InitReason =:= node_deactivated; InitReason =:= node_down -> + %% Silence additional init error when node is deactivated. + State; report_connection_status(Status, State) -> send_info(Status, State), case Status of @@ -474,32 +797,36 @@ send_info(Status, #st{opts = #opts{info_pid = Pid, send_info(_Msg, _State) -> ok. - -connect(Pid, Opts) -> - Result = ered_connection:connect(Opts#opts.host, Opts#opts.port, Opts#opts.connection_opts), - case Result of +%% Connect-wait-retry loop, to run in a separate spawned process. When +%% connected, transfers the socket to the OwnerPid, sends a message `{connected, +%% Socket}` and exits. On connect error, a message `{connect_error, Reason}` is +%% sent and connecting is retried periodically. +connect_loop(now, OwnerPid, + #opts{host = Host, port = Port, transport = Transport, + transport_opts = TransportOpts0, + connect_timeout = Timeout} = Opts) -> + TransportOpts = [{active, 100}, binary] ++ TransportOpts0, + case Transport:connect(Host, Port, TransportOpts, Timeout) of + {ok, Socket} -> + case Transport:controlling_process(Socket, OwnerPid) of + ok -> + OwnerPid ! {connected, Socket}; + {error, Reason} -> + OwnerPid ! {connect_error, Reason}, + Transport:close(Socket), + connect_loop(wait, OwnerPid, Opts) + end; {error, Reason} -> - Pid ! {connect_error, Reason}, - timer:sleep(Opts#opts.reconnect_wait); - - {ok, ConnectionPid} -> - case init(Pid, ConnectionPid, Opts) of - {socket_closed, ConnectionPid, Reason} -> - Pid ! {socket_closed, Reason}, - timer:sleep(Opts#opts.reconnect_wait); - {ok, ClusterId} -> - Pid ! {connected, ConnectionPid, ClusterId}, - receive - {socket_closed, ConnectionPid, Reason} -> - Pid ! {socket_closed, Reason} - end - end - - end, - connect(Pid, Opts). - + OwnerPid ! {connect_error, Reason}, + connect_loop(wait, OwnerPid, Opts) + end; +connect_loop(wait, OwnerPid, Opts) -> + timer:sleep(Opts#opts.reconnect_wait), + connect_loop(now, OwnerPid, Opts). -init(MainPid, ConnectionPid, Opts) -> +init_connection(State) -> + #st{opts = #opts{transport = Transport} = Opts, + socket = Socket} = State, Cmd1 = [[<<"CLUSTER">>, <<"MYID">>] || Opts#opts.use_cluster_id], Cmd2 = case {Opts#opts.resp_version, Opts#opts.auth} of {3, {Username, Password}} -> @@ -515,22 +842,29 @@ init(MainPid, ConnectionPid, Opts) -> Opts#opts.select_db > 0], case Cmd1 ++ Cmd2 ++ Cmd3 of [] -> - {ok, undefined}; - Commands -> - ered_connection:command_async(ConnectionPid, Commands, init_command_reply), - receive - {init_command_reply, Reply} -> - case [Reason || {error, Reason} <- Reply] of - [] when Opts#opts.use_cluster_id -> - {ok, hd(Reply)}; - [] -> - {ok, undefined}; - Errors -> - MainPid ! {init_error, Errors}, - timer:sleep(Opts#opts.reconnect_wait), - init(MainPid, ConnectionPid, Opts) - end; - Other -> - Other + self() ! {init_command_reply, {ok, []}}, + State; + Pipeline -> + %% Add to pending queue and send like any other commands. + ReplyFun = fun (Reply) -> + self() ! {init_command_reply, Reply} + end, + RespCommand = ered_command:convert_to(Pipeline), + Data = ered_command:get_data(RespCommand), + Command = #command{data = RespCommand, replyto = ReplyFun}, + Class = ered_command:get_response_class(RespCommand), + PendingReq = #pending_req{command = Command, response_class = Class}, + Transport = State#st.opts#opts.transport, + case Transport:send(State#st.socket, Data) of + ok -> + State1 = State#st{pending = q_in(PendingReq, State#st.pending), + status = init}, + %% Send commands immediately or wait for init reply first? + %% process_commands(State1); + State1; + {error, _Reason} -> + %% Send FIN and handle replies in flight before reconnecting. + Transport:shutdown(Socket, read_write), + start_connect_loop(wait, State) end end. diff --git a/src/ered_connection.erl b/src/ered_connection.erl deleted file mode 100644 index 13ebba1..0000000 --- a/src/ered_connection.erl +++ /dev/null @@ -1,384 +0,0 @@ --module(ered_connection). - -%% Managing the socket, sending commands and receiving replies. -%% Batches messages from the process queue. One process handles -%% writing to the socket and one handles the reading and decoding. -%% After a command is sent in the sending process a message is sent to -%% the reading process informing it about how many replies to expect -%% and who expects the result. No reconnection handling, if there is -%% an error the processes will exit. - --export([connect/2, - connect/3, - connect_async/3, - command/2, command/3, - command_async/3]). - --export_type([opt/0, - result/0, - host/0]). - - -%%%=================================================================== -%%% Definitions -%%%=================================================================== --record(recv_st, {transport :: gen_tcp | ssl, - socket :: gen_tcp:socket() | ssl:sslsocket(), - push_cb :: push_cb(), - timeout :: non_neg_integer(), % milliseconds - waiting = [] :: [wait_info()], - waiting_since :: undefined | integer() % erlang:monotonic_time(millisecond) - }). - --type opt() :: - %% If commands are queued up in the process message queue this is the max - %% amount of messages that will be received and sent in one call - {batch_size, non_neg_integer()} | - %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. - {connect_timeout, timeout()} | - %% Options passed to gen_tcp:connect/4. - {tcp_options, [gen_tcp:connect_option()]} | - %% Timeout passed to gen_tcp:connect/4. DEPRECATED. - {tcp_connect_timeout, timeout()} | - %% Options passed to ssl:connect/4. If this config parameter is present, - %% TLS is used. - {tls_options, [ssl:tls_client_option()]} | - %% Timeout passed to ssl:connect/4. DEPRECATED. - {tls_connect_timeout, timeout()} | - %% Callback for push notifications - {push_cb, push_cb()} | - %% Timeout when waiting for a response from Redis. milliseconds - {response_timeout, non_neg_integer()}. - --type result() :: ered_parser:parse_result(). --type push_cb() :: fun((result()) -> any()). --type wait_info() :: - {ered_command:response_class() | [ered_command:response_class()], - pid(), - Ref :: any(), - Acc :: [result()]}. % Acc used to store partial pipeline results --type host() :: ered:host(). --type connect_result() :: {ok, connection_ref()} | {error, timeout | inet:posix()}. --type connection_ref() :: pid(). - -%% Commands like SUBSCRIBE and UNSUBSCRIBE don't return anything, so we use this -%% return value. --define(pubsub_reply, undefined). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec connect(host(), inet:port_number()) -> connect_result(). --spec connect(host(), inet:port_number(), [opt()]) -> connect_result(). -%% -%% Connect to Redis node. Start send and receive process. -%% When the connection is closed a socket_closed message will be sent. -%% to the calling process. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -connect(Host, Port) -> - connect(Host, Port, []). - -connect(Host, Port, Opts) -> - Pid = connect_async(Host, Port, Opts), - receive - {connected, Pid} -> - {ok, Pid}; - {connect_error, Pid, Reason} -> - {error, Reason} - end. - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec connect_async(host(), inet:port_number(), [opt()]) -> connection_ref(). -%% -%% Connect to Redis node. Start send and receive process. -%% The function will return before connect is completed and a connected or -%% connect_error message will be sent to the calling process. -%% When the connection is closed a socket_closed message will be sent. -%% to the calling process. -%% -%% Deprecated options: -%% tcp_connect_timeout - replaced by connect_timeout. -%% tls_connect_timeout - replaced by connect_timeout. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -connect_async(Addr, Port, Opts) -> - [error({badarg, BadOpt}) - || BadOpt <- proplists:get_keys(Opts) -- [batch_size, tcp_options, tls_options, push_cb, response_timeout, - tcp_connect_timeout, tls_connect_timeout, connect_timeout]], - BatchSize = proplists:get_value(batch_size, Opts, 16), - ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), - PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), - TcpOptions = proplists:get_value(tcp_options, Opts, []), - TlsOptions = proplists:get_value(tls_options, Opts, []), - TcpTimeout = proplists:get_value(tcp_connect_timeout, Opts, infinity), - TlsTimeout = proplists:get_value(tls_connect_timeout, Opts, infinity), - {Transport, Options, Timeout0} = case TlsOptions of - [] -> - {gen_tcp, TcpOptions, TcpTimeout}; - _ -> - {ssl, TlsOptions, TlsTimeout} - end, - Timeout = proplists:get_value(connect_timeout, Opts, Timeout0), - Master = self(), - spawn_link( - fun() -> - SendPid = self(), - case catch Transport:connect(Addr, Port, [{active, false}, binary] ++ Options, Timeout) of - {ok, Socket} -> - Master ! {connected, SendPid}, - Pid = spawn_link(fun() -> - ExitReason = recv_loop(Transport, Socket, PushCb, ResponseTimeout), - %% Inform sending process about exit - SendPid ! ExitReason - end), - ExitReason = send_loop(Transport, Socket, Pid, BatchSize), - Master ! {socket_closed, SendPid, ExitReason}; - {error, Reason} -> - Master ! {connect_error, SendPid, Reason}; - Other -> % {'EXIT',_} - Master ! {connect_error, SendPid, Other} - end - end). - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec command(connection_ref(), ered_command:command()) -> result(). --spec command(connection_ref(), ered_command:command(), timeout()) -> result(). -%% -%% Send a command to the connected Redis node. The argument can be a -%% single command as a list of binaries, a pipeline of command as a -%% list of commands or a formatted redis_command. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -command(Connection, Command) -> - command(Connection, Command, 10000). - -command(Connection, Command, Timeout) -> - link(Connection), - Ref = make_ref(), - Connection ! {send, self(), Ref, ered_command:convert_to(Command)}, - receive {Ref, Value} -> - unlink(Connection), - Value - after Timeout -> - unlink(Connection), - {error, timeout} - end. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec command_async(connection_ref(), ered_command:command(), any()) -> ok. -%% -%% Send a command to the connected Redis node in asynchronous -%% fashion. The provided callback function will be called with the -%% reply. Note that the callback function will executing in the redis -%% client process and should not hang or perform any lengthy task. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -command_async(Connection, Data, Ref) -> - Connection ! {send, self(), Ref, ered_command:convert_to(Data)}, - ok. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% -%% Receive logic -%% - -recv_loop(Transport, Socket, PushCB, Timeout) -> - ParseInit = ered_parser:next(ered_parser:init()), - State = #recv_st{transport = Transport, socket = Socket, push_cb = PushCB, timeout = Timeout}, - try - recv_loop({ParseInit, State}) - catch - %% handle done, parse error, recv error - throw:Reason -> - {recv_exit, Reason} - end. - -recv_loop({ParseResult, State}) -> - Next = case ParseResult of - {need_more, BytesNeeded, ParserState} -> - read_socket(BytesNeeded, ParserState, State); - {done, Value, ParserState} -> - handle_result(Value, ParserState, State) - end, - recv_loop(Next). - -read_socket(BytesNeeded, ParserState, State) -> - State1 = update_waiting(0, State), - WaitTime = get_timeout(State1), - Transport = State1#recv_st.transport, - case Transport:recv(State1#recv_st.socket, BytesNeeded, WaitTime) of - {ok, Data} -> - {ered_parser:continue(Data, ParserState), State1}; - {error, timeout} when State1#recv_st.waiting == [] -> - %% no command pending, try again - read_socket(BytesNeeded, ParserState, State1); - {error, Reason} -> - throw(Reason) - end. - -handle_result({push, Value = [Type|_]}, ParserState, State) -> - %% Pub/sub in RESP3 is a bit quirky. The push is supposed to be out of bound - %% data not connected to any request but for subscribe and unsubscribe - %% requests, a successful command is signalled as one or more push messages. - PushCB = State#recv_st.push_cb, - PushCB(Value), - State1 = case is_subscribe_push(Type) of - true -> - handle_subscribe_push(Value, State); - false -> - State - end, - {ered_parser:next(ParserState), State1}; -handle_result(Value, ParserState, State) -> - {{RespClass, Pid, Ref, Acc}, State1} = pop_waiting(State), - %% Check how many replies expected (list = pipeline) - case RespClass of - Single when not is_list(Single) -> - Pid ! {Ref, Value}, - {ered_parser:next(ParserState), State1}; - [_] -> - %% Last one, send the reply - Pid ! {Ref, lists:reverse([Value | Acc])}, - {ered_parser:next(ParserState), State1}; - [_ | RespClasses] -> - %% More left, save the reply and keep going - State2 = push_waiting({RespClasses, Pid, Ref, [Value | Acc]}, State1), - {ered_parser:next(ParserState), State2} - end. - -is_subscribe_push(<<"subscribe">>) -> - true; -is_subscribe_push(<>) when X >= $a, X =< $z -> - true; -is_subscribe_push(<<"unsubscribe">>) -> - true; -is_subscribe_push(<>) when X >= $a, X =< $z -> - true; -is_subscribe_push(_) -> - false. - -handle_subscribe_push(PushMessage, State) -> - case try_pop_waiting(State) of - {PoppedWaiting, State1} -> - handle_subscribed_popped_waiting(PushMessage, PoppedWaiting, State1); - none -> - %% No commands pending. - State - end. - -handle_subscribed_popped_waiting(Push, Waiting = {ExpectClass, Pid, Ref, Acc}, State) -> - case {ExpectClass, hd(Push)} of - {{Type, N}, Type} % simple command - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - Pid ! {Ref, ?pubsub_reply}, - State; - {{Type, N}, Type} % simple command - when N > 1 -> % not yet subscribed all channels - push_waiting({{Type, N - 1}, Pid, Ref, Acc}, State); - {[{Type, N}], Type} % last command in pipeline - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - Pid ! {Ref, lists:reverse([?pubsub_reply | Acc])}, - State; - {[{Type, N} | Classes], Type} % pipeline, not the last command - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - push_waiting({Classes, Pid, Ref, [?pubsub_reply | Acc]}, State); - {[{Type, N} | Classes], Type} % pipeline - when N > 1 -> % not yet subscribed all channels - push_waiting({[{Type, N - 1} | Classes], Pid, Ref, Acc}, State); - _Otherwise -> - %% Not waiting for this particular push message. - push_waiting(Waiting, State) - end. - -get_timeout(State) -> - case State#recv_st.waiting_since of - undefined -> - State#recv_st.timeout; - Since -> - case State#recv_st.timeout - (erlang:monotonic_time(millisecond) - Since) of - T when T < 0 -> 0; - T -> T - end - end. - -pop_waiting(State) -> - State1 = update_waiting(infinity, State), - [WaitInfo | Rest] = State1#recv_st.waiting, - {WaitInfo, State1#recv_st{waiting = Rest}}. - -try_pop_waiting(State) -> - State1 = update_waiting(0, State), - case State1#recv_st.waiting of - [WaitInfo | Rest] -> - {WaitInfo, State1#recv_st{waiting = Rest}}; - [] -> - none - end. - -push_waiting(WaitInfo,State) -> - State#recv_st{waiting = [WaitInfo | State#recv_st.waiting]}. - -update_waiting(Timeout, State) when State#recv_st.waiting == [] -> - case receive Msg -> Msg after Timeout -> timeout end of - {requests, Req, Time} -> - State#recv_st{waiting = Req, waiting_since = Time}; - timeout -> - State#recv_st{waiting_since = undefined}; - close_down -> - throw(done) - end; -update_waiting(_Timeout, State) -> - State. - -%% -%% Send logic -%% - -send_loop(Transport, Socket, RecvPid, BatchSize) -> - case receive_data(BatchSize) of - {recv_exit, Reason} -> - {recv_exit, Reason}; - {data, {Refs, Data}} -> - Time = erlang:monotonic_time(millisecond), - case Transport:send(Socket, Data) of - ok -> - %% send to recv proc to fetch the response - RecvPid ! {requests, Refs, Time}, - send_loop(Transport, Socket, RecvPid, BatchSize); - {error, Reason} -> - %% Give recv_loop time to finish processing - %% This will shut down recv_loop if it is waiting on socket - Transport:shutdown(Socket, read_write), - %% This will shut down recv_loop if it is waiting for a reference - RecvPid ! close_down, - %% Ok, recv done, time to die - receive {recv_exit, _Reason} -> ok end, - {send_exit, Reason} - end - end. - -receive_data(N) -> - receive_data(N, infinity, []). - -receive_data(0, _Time, Acc) -> - {data, lists:unzip(lists:reverse(Acc))}; -receive_data(N, Time, Acc) -> - receive - {recv_exit, Reason} -> - {recv_exit, Reason}; - {send, Pid, Ref, Commands} -> - Data = ered_command:get_data(Commands), - Class = ered_command:get_response_class(Commands), - RefInfo = {Class, Pid, Ref, []}, - Acc1 = [{RefInfo, Data} | Acc], - receive_data(N - 1, 0, Acc1); - _Ignore -> - %% Mitigate OTP TLS 1.3 bug #10273 leaking a message {Ref, ok}. - receive_data(N, 0, Acc) - after Time -> - receive_data(0, 0, Acc) - end. diff --git a/src/ered_parser.erl b/src/ered_parser.erl index 629a5b5..11146a3 100644 --- a/src/ered_parser.erl +++ b/src/ered_parser.erl @@ -7,7 +7,8 @@ continue/2]). -export_type([parse_return/0, - parse_result/0 + parse_result/0, + state/0 ]). %%%=================================================================== @@ -25,10 +26,12 @@ -type parse_result() :: binary() | {error, binary()} | integer() | undefined | [parse_result()] | inf | neg_inf | nan | float() | true | false | #{parse_result() => parse_result()} | sets:set(parse_result()) | - {attribute, parse_result(), parse_result()} | {push | parse_result()}. + {attribute, parse_result(), parse_result()} | {push, parse_result()}. - --type parse_return() :: {done, parse_result(), #parser_state{}} | {need_more, bytes_needed(), #parser_state{}}. +-opaque state() :: #parser_state{}. +-type parse_return() :: {done, parse_result(), state()} | + {need_more, bytes_needed(), state()} | + {parse_error, any()}. -if(?OTP_RELEASE >= 24). -define(sets_new, sets:new([{version, 2}])). @@ -41,7 +44,7 @@ %%%=================================================================== %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec init() -> #parser_state{}. +-spec init() -> state(). %% %% Init empty parser continuation %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -49,7 +52,7 @@ init() -> #parser_state{}. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec next(#parser_state{}) -> parse_return(). +-spec next(state()) -> parse_return(). %% %% Get next result or continuation. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -57,7 +60,7 @@ next(State) -> parse(State). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec continue(binary(), #parser_state{}) -> parse_return(). +-spec continue(binary(), state()) -> parse_return(). %% %% Feed more data to the parser. Get next result or continuation. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -70,16 +73,21 @@ continue(NewData, State) -> %%%=================================================================== parse(State=#parser_state{data=Data, next=Fun, bytes_needed=Bytes}) -> - case token(Data, Bytes) of - {need_more, LackingBytes} -> - {need_more, LackingBytes, State}; - {Token, Rest} -> - case Fun(Token) of - {done, Result} -> - {done, Result, #parser_state{data=Rest}}; - {cont, NextFun, NextBytes} -> - parse(#parser_state{data=Rest, next=NextFun, bytes_needed=NextBytes}) - end + try + case token(Data, Bytes) of + {need_more, LackingBytes} -> + {need_more, LackingBytes, State}; + {Token, Rest} -> + case Fun(Token) of + {done, Result} -> + {done, Result, #parser_state{data=Rest}}; + {cont, NextFun, NextBytes} -> + parse(#parser_state{data=Rest, next=NextFun, bytes_needed=NextBytes}) + end + end + catch + {parse_error, _Reason} = ParseError -> + ParseError end. token(Data, 0) -> diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 51ceab6..ffe6fdc 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -40,7 +40,8 @@ request_t() -> fail_connect_t() -> {ok,Pid} = ered_client:start_link("127.0.0.1", 0, [{info_pid, self()}]), - {connect_error,econnrefused} = expect_connection_down(Pid), + {connect_error, Reason} = expect_connection_down(Pid), + true = Reason =:= econnrefused orelse Reason =:= eaddrnotavail, %% make sure there are no more connection down messages timeout = receive M -> M after 500 -> timeout end. @@ -66,7 +67,7 @@ fail_parse_t() -> Pid ! ered_client:command(Client, [<<"ping">>]) end), expect_connection_up(Client), - Reason = {recv_exit, {parse_error,{invalid_data,<<"&pong">>}}}, + Reason = {parse_error, {invalid_data, <<"&pong">>}}, receive #{msg_type := socket_closed, reason := Reason} -> ok end, expect_connection_up(Client), {ok, <<"pong">>} = get_msg(). @@ -75,18 +76,22 @@ fail_parse_t() -> server_close_socket_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), - spawn_link(fun() -> - {ok, Sock} = gen_tcp:accept(ListenSock), - gen_tcp:close(Sock), - - %% resend from client - {ok, _Sock2} = gen_tcp:accept(ListenSock), - receive ok -> ok end - end), + ServerPid = + spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + receive continue -> ok end, + gen_tcp:close(Sock), + + %% resend from client + {ok, _Sock2} = gen_tcp:accept(ListenSock), + receive done -> ok end + end), Client = start_client(Port), expect_connection_up(Client), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, - expect_connection_up(Client). + ServerPid ! continue, + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, + expect_connection_up(Client), + ServerPid ! done. %% Suppress warning from command 'bad_request' @@ -172,7 +177,7 @@ server_buffer_full_reconnect_t() -> receive #{msg_type := queue_full} -> ok end, %% 1 message over the limit, first one in queue gets kicked out {6, {error, queue_overflow}} = get_msg(), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, %% when connection goes down the pending messages will be put in the queue and the queue %% will overflow kicking out the oldest first [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], @@ -202,7 +207,7 @@ server_buffer_full_node_goes_down_t() -> [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], receive #{msg_type := queue_full} -> ok end, {6, {error, queue_overflow}} = get_msg(), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], receive #{msg_type := queue_ok} -> ok end, receive #{msg_type := connect_error, reason := econnrefused} -> ok end, @@ -242,7 +247,7 @@ send_timeout_t() -> Pid = self(), ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {reply, Reply} end), %% this should come after max 1000ms - receive #{msg_type := socket_closed, reason := {recv_exit, timeout}} -> ok after 2000 -> timeout_error() end, + receive #{msg_type := socket_closed, reason := timeout} -> ok after 2000 -> timeout_error() end, expect_connection_up(Client), {reply, {ok, <<"pong">>}} = get_msg(), no_more_msgs(). @@ -256,15 +261,12 @@ fail_hello_t() -> {ok, <<"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), ok = gen_tcp:send(Sock, <<"-NOPROTO unsupported protocol version\r\n">>), - %% test resend - {ok, <<"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, <<"-NOPROTO unsupported protocol version\r\n">>), - - Pid ! done + Pid ! done, + {error, closed} = gen_tcp:recv(Sock, 0) end), {ok,Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}]), - {init_error, [<<"NOPROTO unsupported protocol version">>]} = expect_connection_down(Client), receive done -> ok end, + {init_error, [<<"NOPROTO unsupported protocol version">>]} = expect_connection_down(Client), no_more_msgs(). hello_with_auth_t() -> @@ -294,11 +296,13 @@ hello_with_auth_t() -> "$6\r\nmaster\r\n" "$7\r\nmodules\r\n" "*0\r\n">>), - Pid ! done + Pid ! done, + receive ok -> ok end end), - {ok, _Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, - {auth, {<<"ali">>, <<"sesame">>}}]), + {ok, Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, + {auth, {<<"ali">>, <<"sesame">>}}]), receive done -> ok end, + expect_connection_up(Client), no_more_msgs(). hello_with_auth_fail_t() -> @@ -333,12 +337,14 @@ auth_t() -> "$6\r\nsesame\r\n">>} = gen_tcp:recv(Sock, 0), ok = gen_tcp:send(Sock, <<"+OK\r\n">>), - Pid ! done + Pid ! done, + receive ok -> ok end end), - {ok, _Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, - {resp_version, 2}, - {auth, {<<"ali">>, <<"sesame">>}}]), + {ok, Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, + {resp_version, 2}, + {auth, {<<"ali">>, <<"sesame">>}}]), receive done -> ok end, + expect_connection_up(Client), no_more_msgs(). auth_fail_t() -> diff --git a/test/ered_cluster_SUITE.erl b/test/ered_cluster_SUITE.erl index 66666a7..0c4f966 100644 --- a/test/ered_cluster_SUITE.erl +++ b/test/ered_cluster_SUITE.erl @@ -272,7 +272,7 @@ t_hard_failover(_) -> ct:pal("~p\n", [ered_cluster:command_all(R, [<<"CLUSTER">>, <<"SLOTS">>])]), ct:pal(os:cmd("docker stop " ++ Pod)), - ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := {recv_exit, closed}}), + ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := tcp_closed}), ?MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}), ?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}, 2500), @@ -363,7 +363,7 @@ t_manual_failover_then_old_master_down(_) -> ?MSG(#{addr := {"127.0.0.1", Port}, master := true, msg_type := socket_closed, - reason := {recv_exit, closed}}), + reason := tcp_closed}), %% Ered prefers the replica of the disconnected node for slot map update, %% since it is likely to know about a failover first; it is the new master. @@ -420,7 +420,7 @@ t_blackhole(_) -> ered:command_async(ClientRef, [<<"PING">>], fun(Reply) -> TestPid ! {ping_reply, Reply} end), - ?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, master := true}, + ?MSG(#{msg_type := socket_closed, reason := timeout, master := true}, ResponseTimeout + 1000), ?MSG({ping_reply, {error, _Reason}}, % node_down or node_deactivated NodeDownTimeout + 1000), @@ -432,6 +432,8 @@ t_blackhole(_) -> ?OPTIONAL_MSG(#{msg_type := cluster_ok}), ?MSG(#{msg_type := client_stopped, reason := shutdown, master := false}, CloseWait + 1000), + ?OPTIONAL_MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}), + no_more_msgs(), ct:pal("Unpausing container: " ++ os:cmd("docker unpause " ++ Pod)), timer:sleep(500), @@ -477,7 +479,7 @@ t_blackhole_all_nodes(_) -> fun(Reply) -> TestPid ! {ping_reply, Reply} end) end, AddrToClient), - [?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, addr := {"127.0.0.1", Port}}, + [?MSG(#{msg_type := socket_closed, reason := timeout, addr := {"127.0.0.1", Port}}, ResponseTimeout + 1000) || Port <- ?PORTS], ?MSG({ping_reply, {error, _Reason1}}, NodeDownTimeout + 1000), ?MSG({ping_reply, {error, _Reason2}}, NodeDownTimeout + 1000), @@ -526,11 +528,14 @@ t_init_timeout(_) -> ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT PAUSE 10000")]), {ok, _P} = ered_cluster:connect([{localhost, 30001}], [{info_pid, [self()]}] ++ Opts), - ?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}}, 3500), + ?MSG(#{msg_type := socket_closed, reason := timeout}, 3500), ?MSG(#{msg_type := node_down_timeout, addr := {localhost, 30001}}, 2500), %% Does not work on Redis before 6.2.0. ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT UNPAUSE")]), + %% Maybe we were waiting for init commands when the node down timeout fired. + ?OPTIONAL_MSG(#{msg_type := init_error, reason := node_down}), + ?MSG(#{msg_type := connected, addr := {localhost, 30001}}), ?MSG(#{msg_type := slot_map_updated}), @@ -760,8 +765,8 @@ t_kill_client(_) -> ct:pal("~p\n",[os:cmd("redis-cli -p " ++ integer_to_list(Port) ++ " CLIENT KILL TYPE NORMAL")]), ?MSG(#{msg_type := socket_closed, addr := {_, Port}}), - %% connection reestablished - ?MSG(#{msg_type := connected, addr := {_, Port}}), + %% Waits until 1000ms after the first connect before reconnecting. + ?MSG(#{msg_type := connected, addr := {_, Port}}, 2000), no_more_msgs(). t_new_cluster_master(_) -> diff --git a/test/ered_cluster_tls_SUITE.erl b/test/ered_cluster_tls_SUITE.erl index 475aa5e..e0ab5e1 100644 --- a/test/ered_cluster_tls_SUITE.erl +++ b/test/ered_cluster_tls_SUITE.erl @@ -190,8 +190,10 @@ t_expired_cert_tls_1_3(_) -> [{info_pid, [self()]}, {client_opts, ClientOpts}]), ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", 31001}, - reason := {recv_exit, - {tls_alert, - {certificate_expired, _}}}}), + reason := {tls_alert, {certificate_expired, _}}}), ?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", 31001}}, 2500), + timer:sleep(10), % Wait for the optional messages. + ?OPTIONAL_MSG(#{msg_type := init_error, reason := node_down}), + ?OPTIONAL_MSG(#{msg_type := socket_closed, + reason := {tls_alert, {certificate_expired, _}}}), no_more_msgs(). diff --git a/test/ered_connection_tests.erl b/test/ered_connection_tests.erl deleted file mode 100644 index 1505c95..0000000 --- a/test/ered_connection_tests.erl +++ /dev/null @@ -1,86 +0,0 @@ --module(ered_connection_tests). - --include_lib("eunit/include/eunit.hrl"). - -split_data_test() -> - Data = << <<"A">> || _ <- lists:seq(1, 3000) >>, - {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), - {ok, Port} = inet:port(ListenSock), - spawn_link(fun() -> - {ok, Sock} = gen_tcp:accept(ListenSock), - {ok, <<"*2\r\n$5\r\nhello\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), - HelloReply = <<"%7\r\n", - "$6\r\nserver\r\n", "$6\r\nvalkey\r\n", - "$7\r\nversion\r\n", "$5\r\n9.0.0\r\n", - "$5\r\nproto\r\n", ":3\r\n" - "$2\r\nid\r\n", ":2\r\n", - "$4\r\nmode\r\n", "$10\r\nstandalone\r\n" - "$4\r\nrole\r\n", "$6\r\nmaster\r\n" - "$7\r\nmodules\r\n", "*0\r\n">>, - ok = gen_tcp:send(Sock, HelloReply), - SetCommand = <<"*3\r\n$3\r\nset\r\n$4\r\nkey1\r\n$3000\r\n", Data/binary, "\r\n">>, - {ok, SetCommand} = gen_tcp:recv(Sock, size(SetCommand)), - ok = gen_tcp:send(Sock, <<"+OK\r\n">>), - {ok, <<"*2\r\n$3\r\nget\r\n$4\r\nkey1\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, <<"$3000\r\n", Data/binary, "\r\n">>), - {error, closed} = gen_tcp:recv(Sock, 0), - %% ok = gen_tcp:shutdown(Sock, write), - ok - end), - {ok, Conn1} = ered_connection:connect("127.0.0.1", Port), - ered_connection:command(Conn1, [<<"hello">>, <<"3">>]), - <<"OK">> = ered_connection:command(Conn1, [<<"set">>, <<"key1">>, Data]), - Data = ered_connection:command(Conn1, [<<"get">>, <<"key1">>]). - -%% Suppress warnings due to expected failures from MalformedCommand. --dialyzer({[no_fail_call, no_return], trailing_reply_test/0}). -trailing_reply_test() -> - Pid = self(), - %% 277124 byte nested array, it takes a non-trivial time to parse - BigNastyData = iolist_to_binary(nested_list(8)), - ?debugFmt("~w", [size(BigNastyData)]), - - spawn_link(fun() -> - {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), - {ok, Port} = inet:port(ListenSock), - Pid ! {port, Port}, - {ok, Sock} = gen_tcp:accept(ListenSock), - {ok, <<"*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, BigNastyData), - ok = gen_tcp:shutdown(Sock, write), - Pid ! sent_big_nasty, - receive ok -> ok end - end), - {port, Port} = receive_msg(), - %% increase receive buffer to fit the whole nasty data package - {ok, Conn1} = ered_connection:connect("127.0.0.1", Port, [{batch_size, 1}, - {tcp_options, [{recbuf, 524288}]}]), - ?debugFmt("~w", [Conn1]), - ered_connection:command_async(Conn1, [<<"ping">>], ping1), - receive sent_big_nasty -> ok end, - MalformedCommand = {redis_command, {pipeline, [normal]}, undefined}, - ered_connection:command_async(Conn1, MalformedCommand, no_ref), - - %% make sure the ping is received before the connection is shut down - - ?debugMsg("waiting for ping"), - - receive {ping1, _} -> ok after 2000 -> exit(waiting_for_ping) end, - ?debugMsg("got ping"), - {socket_closed, Conn1, {send_exit, einval}} = receive Msg -> Msg end, - ensure_empty(). - - -receive_msg() -> - receive Msg -> Msg end. - -%% This function is used from trailing_reply_test() --dialyzer({no_unused, ensure_empty/0}). -ensure_empty() -> - empty = receive Msg -> Msg after 0 -> empty end. - - -nested_list(1) -> - <<"+A\r\n">>; -nested_list(N) -> - ["*", integer_to_list(N), "\r\n", [nested_list(N-1) || _ <- lists:seq(1, N)]]. diff --git a/test/ered_parser_tests.erl b/test/ered_parser_tests.erl index 3b063b9..2dc318e 100644 --- a/test/ered_parser_tests.erl +++ b/test/ered_parser_tests.erl @@ -85,12 +85,11 @@ parse_fail_test_() -> decode_err(In, Expected) -> fun() -> - try - A = ered_parser:continue(In, ered_parser:init()), - exit({unexpected_success, A}) - catch - throw:{parse_error, Err} -> - ?assertEqual(Expected, Err) + case ered_parser:continue(In, ered_parser:init()) of + {parse_error, Err} -> + ?assertEqual(Expected, Err); + A -> + exit({unexpected_success, A}) end end. From e09ba0f429859249c853feef3dd3121310d188dc Mon Sep 17 00:00:00 2001 From: Hippo <44473866+WilliamVoong@users.noreply.github.com> Date: Tue, 17 Mar 2026 11:00:24 +0100 Subject: [PATCH 2/4] Add support for sending commands in batches (#158) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the pending queue is full, don't send more commands until the pending queue can fit another batch of commands, as configured with the existing config `batch_size`. The purpose is to improve performance by sending fewer and larger TCP packets. The config for `batch_size` has existed before but the batching functionality has not worked properly. Bonus: Use assert macros instead of direct pattern matching in eunit tests to verify output/input, to get better error messages on failure. --------- Co-authored-by: William Voong Co-authored-by: Viktor Söderqvist --- README.md | 8 ++-- src/ered_client.erl | 77 ++++++++++++++++++++++++++------------ test/ered_client_tests.erl | 73 ++++++++++++++++++++++++++++++------ 3 files changed, 118 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 90740c8..53ef8f9 100644 --- a/README.md +++ b/README.md @@ -271,16 +271,16 @@ options are wrapped in `{client_opts, [...]}` and included in cluster options. Options passed to the connection module. See [Connection options](#connection-options) below. -* `{max_waiting, non_neg_integer()}` +* `{max_waiting, pos_integer()}` Max number of commands allowed to wait in queue. Default 5000. -* `{max_pending, non_neg_integer()}` +* `{max_pending, pos_integer()}` Max number of commands to be pending, i.e. sent to client and waiting for a response. Default 128. -* `{queue_ok_level, non_neg_integer()}` +* `{queue_ok_level, pos_integer()}` If the queue has been full then it is considered ok again when it reaches this level. Default 2000. @@ -331,7 +331,7 @@ wrapped in `{connection_opts, [ered_connection:opt()]}`. For `ered_cluster:connect/2`, the connection options are included under client options, as `{client_opts, [{connection_opts, [...]}]}`. -* `{batch_size, non_neg_integer()}` +* `{batch_size, pos_integer()}` If commands are queued up in the process message queue, this is the maximum number of messages that will be received and sent in one call. Default 16. diff --git a/src/ered_client.erl b/src/ered_client.erl index 903deeb..251840b 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -15,6 +15,9 @@ command/2, command/3, command_async/3]). +%% testing/debugging +-export([state_to_map/1]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,7 +39,7 @@ port :: inet:port_number(), %% From "connection opts" - batch_size = 16 :: non_neg_integer(), + batch_size = 16 :: pos_integer(), transport = gen_tcp :: gen_tcp | ssl, transport_opts = [] :: list(), connect_timeout = infinity :: timeout(), @@ -53,8 +56,8 @@ info_pid = none :: none | pid(), queue_ok_level = 2000 :: non_neg_integer(), - max_waiting = 5000 :: non_neg_integer(), - max_pending = 128 :: non_neg_integer() + max_waiting = 5000 :: pos_integer(), + max_pending = 128 :: pos_integer() }). -record(st, @@ -68,6 +71,11 @@ waiting = q_new() :: command_queue(), pending = q_new() :: command_queue(), + %% Batching. When pending queue is full, + %% set we don't send more until another + %% complete batch can fit in the pending queue. + filling_batch = true :: boolean(), + cluster_id = undefined :: undefined | binary(), queue_full_event_sent = false :: boolean(), % set to true when full, false when reaching queue_ok_level @@ -118,10 +126,10 @@ %% Options passed to the connection module {connection_opts, [connection_opt()]} | %% Max number of commands allowed to wait in queue. - {max_waiting, non_neg_integer()} | + {max_waiting, pos_integer()} | %% Max number of commands to be pending, i.e. sent to client %% and waiting for a response. - {max_pending, non_neg_integer()} | + {max_pending, pos_integer()} | %% If the queue has been full then it is considered ok %% again when it reaches this level {queue_ok_level, non_neg_integer()} | @@ -148,7 +156,7 @@ -type connection_opt() :: %% If commands are queued up in the process message queue this is the max %% amount of messages that will be received and sent in one call - {batch_size, non_neg_integer()} | + {batch_size, pos_integer()} | %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. {connect_timeout, timeout()} | %% Options passed to gen_tcp:connect/4. @@ -272,6 +280,13 @@ command_async(ServerRef, Command, CallbackFun) -> gen_server:cast(ServerRef, #command{data = ered_command:convert_to(Command), replyto = CallbackFun}). +%% Converts a state record to a map, for easier testing. +%% Used in tests, after calling sys:get_state(EredClientPid). +state_to_map(#st{} = State) -> + Fields = record_info(fields, st), + [st | Values] = tuple_to_list(State), + maps:from_list(lists:zip(Fields, Values)). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -623,7 +638,8 @@ abort_pending_commands(State) -> PendingReqs = [Req#pending_req.command || Req <- q_to_list(State#st.pending)], State#st{waiting = q_join(q_from_list(PendingReqs), State#st.waiting), pending = q_new(), - parser_state = ered_parser:init()}. + parser_state = ered_parser:init(), + filling_batch = true}. connection_down(Reason, State) -> State1 = abort_pending_commands(State), @@ -636,30 +652,43 @@ connection_down(Reason, State) -> process_commands(State) -> NumWaiting = q_len(State#st.waiting), NumPending = q_len(State#st.pending), + BatchSize = State#st.opts#opts.batch_size, + LowWaterMark = max(State#st.opts#opts.max_pending - BatchSize, 1), + if State#st.status =:= up, State#st.socket =/= none, - NumWaiting > 0, NumPending < State#st.opts#opts.max_pending -> - %% TODO: Pop multiple from queue and send them in a batch. Use the batch_size option. - %% Use q_split, q_join and q_to_list. + NumWaiting > 0, State#st.filling_batch -> %% TODO: Add request timeout timestamp to PendingReq. - {Command, NewWaiting} = q_out(State#st.waiting), - RespCommand = Command#command.data, - Data = ered_command:get_data(RespCommand), - Class = ered_command:get_response_class(RespCommand), + {CommandQueue, NewWaiting} = q_split(min(BatchSize, NumWaiting), State#st.waiting), + {BatchedData, PendingRequests} = + lists:foldr(fun(Command, {DataAcc, PendingAcc}) -> + RespCommand = Command#command.data, + ResponseClass = ered_command:get_response_class(RespCommand), + + NewBatchedData = ered_command:get_data(RespCommand), + NewPendingRequest = #pending_req{command = Command, + response_class = ResponseClass}, + {[NewBatchedData | DataAcc] , q_in_r(NewPendingRequest, PendingAcc)} + end, + {[], q_new()}, + q_to_list(CommandQueue)), Transport = State#st.opts#opts.transport, - case Transport:send(State#st.socket, Data) of + case Transport:send(State#st.socket, BatchedData) of ok -> - PendingReq = #pending_req{command = Command, - response_class = Class}, - State1 = State#st{pending = q_in(PendingReq, State#st.pending), - waiting = NewWaiting}, - process_commands(State1); + NewPending = q_join(State#st.pending, PendingRequests), + NewState = State#st{waiting = NewWaiting, + pending = NewPending, + filling_batch = q_len(NewPending) < State#st.opts#opts.max_pending}, + process_commands(NewState); {error, _Reason} -> %% Send FIN and handle replies in fligh before reconnecting. Transport:shutdown(State#st.socket, read_write), start_connect_loop(now, State#st{status = init}) end; + not State#st.filling_batch, NumPending < LowWaterMark -> + process_commands(State#st{filling_batch = true}); + NumWaiting > State#st.opts#opts.max_waiting, State#st.queue_full_event_sent -> drop_commands(State); @@ -689,7 +718,7 @@ start_connect_loop(When0, State) -> When0 end, ConnectPid = spawn_link(fun () -> connect_loop(When, Self, State#st.opts) end), - State#st{connection_loop_pid = ConnectPid}. + State#st{connection_loop_pid = ConnectPid}. drop_commands(State) -> case q_len(State#st.waiting) > State#st.opts#opts.max_waiting of @@ -720,9 +749,9 @@ q_out({Size, Q}) -> {{value, Val}, NewQ} -> {Val, {Size-1, NewQ}} end. -%% q_split(N, {Size, Q}) when N =< Size -> -%% {A, B} = queue:split(N, Q), -%% {{N, A}, {Size - N, B}}. +q_split(N, {Size, Q}) when N =< Size -> + {A, B} = queue:split(N, Q), + {{N, A}, {Size - N, B}}. q_to_list({_Size, Q}) -> queue:to_list(Q). diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index ffe6fdc..fbed66c 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -11,6 +11,7 @@ run_test_() -> {spawn, fun server_close_socket_t/0}, {spawn, fun bad_request_t/0}, {spawn, fun server_buffer_full_t/0}, + {spawn, fun low_high_watermark_t/0}, {spawn, fun bad_option_t/0}, {spawn, fun bad_connection_option_t/0}, {spawn, fun server_buffer_full_reconnect_t/0}, @@ -118,7 +119,7 @@ server_buffer_full_t() -> Expected = iolist_to_binary(lists:duplicate(5, Ping)), {ok, Expected} = gen_tcp:recv(Sock, size(Expected)), %% should be nothing more since only 5 pending - {error, timeout} = gen_tcp:recv(Sock, 0, 0), + ?assertEqual({error, timeout}, gen_tcp:recv(Sock, 0, 0)), timer:sleep(500), @@ -138,11 +139,61 @@ server_buffer_full_t() -> Pid = self(), [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], receive #{msg_type := queue_full} -> ok end, - {6, {error, queue_overflow}} = get_msg(), + ?assertMatch({6, {error, queue_overflow}}, get_msg()), receive #{msg_type := queue_ok} -> ok end, - [{N, {ok, <<"pong">>}} = get_msg()|| N <- [1,2,3,4,5,7,8,9,10,11]], + [?assertMatch({N, {ok, <<"pong">>}}, get_msg()) || N <- [1,2,3,4,5,7,8,9,10,11]], no_more_msgs(). +low_high_watermark_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, Port} = inet:port(ListenSock), + ServerPid = spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + Ping = <<"*1\r\n$4\r\nping\r\n">>, + FivePing = iolist_to_binary(lists:duplicate(5, Ping)), + {ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)), + + %% should be nothing more since only 5 pending + ?assertEqual({error, timeout}, gen_tcp:recv(Sock, 0, 0)), + + gen_tcp:send(Sock, lists:duplicate(4, <<"+pong\r\n">>)), + receive send_one_more_pong -> ok end, + gen_tcp:send(Sock, lists:duplicate(1, <<"+pong\r\n">>)), + + {ok, FivePing} = gen_tcp:recv(Sock, 0), + gen_tcp:send(Sock, lists:duplicate(5, <<"+pong\r\n">>)), + receive ok -> ok end + end), + Client = start_client(Port, [{connection_opts, [{batch_size,5}]}, {max_waiting, 10}, {max_pending, 5}, {queue_ok_level,1}]), + expect_connection_up(Client), + + Pid = self(), + [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,10)], + [?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [1,2,3,4]], + + %% high water mark is hit, and we can not fill more until we reach low water-mark. + ?assertMatch(#{pending := {1, _}, + waiting := {5, _}, + filling_batch := false}, + ered_client:state_to_map(sys:get_state(Client))), + + ServerPid ! send_one_more_pong, + [?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [5]], + + %% low water mark reached, pending should now be filled. + ?assertMatch(#{pending := {5, _}, + waiting := {0, _}, + filling_batch := false}, + ered_client:state_to_map(sys:get_state(Client))), + + [?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [6,7,8,9,10]], + + ?assertMatch(#{pending := {0, _}, + waiting := {0, _}, + filling_batch := true}, + ered_client:state_to_map(sys:get_state(Client))), + + no_more_msgs(). server_buffer_full_reconnect_t() -> @@ -150,17 +201,16 @@ server_buffer_full_reconnect_t() -> {ok, Port} = inet:port(ListenSock), spawn_link(fun() -> {ok, Sock} = gen_tcp:accept(ListenSock), - %% expect 5 ping Ping = <<"*1\r\n$4\r\nping\r\n">>, - Expected = iolist_to_binary(lists:duplicate(5, Ping)), - {ok, Expected} = gen_tcp:recv(Sock, size(Expected)), + FivePing = iolist_to_binary(lists:duplicate(5, Ping)), + {ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)), %% should be nothing more since only 5 pending {error, timeout} = gen_tcp:recv(Sock, 0, 0), gen_tcp:close(Sock), {ok, Sock2} = gen_tcp:accept(ListenSock), - {ok, Expected} = gen_tcp:recv(Sock2, size(Expected)), + {ok, FivePing} = gen_tcp:recv(Sock2, size(FivePing)), gen_tcp:send(Sock2, lists:duplicate(5, <<"+pong\r\n">>)), %% should be nothing more since only 5 pending @@ -192,10 +242,9 @@ server_buffer_full_node_goes_down_t() -> {ok, Port} = inet:port(ListenSock), spawn_link(fun() -> {ok, Sock} = gen_tcp:accept(ListenSock), - %% expect 5 ping Ping = <<"*1\r\n$4\r\nping\r\n">>, - Expected = iolist_to_binary(lists:duplicate(5, Ping)), - {ok, Expected} = gen_tcp:recv(Sock, size(Expected)), + FivePing = iolist_to_binary(lists:duplicate(5, Ping)), + {ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)), %% should be nothing more since only 5 pending {error, timeout} = gen_tcp:recv(Sock, 0, 0), gen_tcp:close(ListenSock) @@ -206,9 +255,9 @@ server_buffer_full_node_goes_down_t() -> Pid = self(), [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], receive #{msg_type := queue_full} -> ok end, - {6, {error, queue_overflow}} = get_msg(), + ?assertEqual({6, {error, queue_overflow}}, get_msg()), receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, - [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], + [?assertEqual({N, {error, queue_overflow}}, get_msg()) || N <- [1,2,3,4,5]], receive #{msg_type := queue_ok} -> ok end, receive #{msg_type := connect_error, reason := econnrefused} -> ok end, receive #{msg_type := node_down_timeout} -> ok end, From e359980c955ce50e30570beaa0b4b6d20d353ab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 1 Apr 2026 17:55:52 +0200 Subject: [PATCH 3/4] Non-blocking send (#160) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use send_timeout 0. If send returns timeout, it has still queued all the data for sending later. When this happens, backoff and don't send more to the socket until the pending commands have got replies from the server. This requires that gen_tcp is used with the inet backend (the default) and not the socket backend. With TLS, this approach works in older OTP versions, but it was broken for TLS in OTP 28.0 - 28.4, but fixed again in 28.4.1 (OTP-20018). If we need to support these OTP versions, or if we want to suppor gen_tcp with the socket backend or the socket module directly, we can implement more variants later. --------- Signed-off-by: Viktor Söderqvist Co-authored-by: Hippo <44473866+WilliamVoong@users.noreply.github.com> --- src/ered_client.erl | 53 +++++++++++++++++---- test/ered_client_tests.erl | 95 +++++++++++++++++++++++++++++++++++++- 2 files changed, 136 insertions(+), 12 deletions(-) diff --git a/src/ered_client.erl b/src/ered_client.erl index 251840b..3f17372 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -71,6 +71,7 @@ waiting = q_new() :: command_queue(), pending = q_new() :: command_queue(), + backoff_send = false :: boolean(), % true after a send timeout %% Batching. When pending queue is full, %% set we don't send more until another %% complete batch can fit in the pending queue. @@ -391,7 +392,7 @@ handle_info({Passive, Socket}, #st{socket = Socket} = State) {error, Reason} -> Transport = State#st.opts#opts.transport, Transport:close(Socket), - {noreply, connection_down({socket_closed, Reason}, State#st{socket = undefined})} + {noreply, connection_down({socket_closed, Reason}, State#st{socket = none})} end; handle_info({Error, Socket, Reason}, #st{socket = Socket} = State) @@ -418,6 +419,7 @@ handle_info({connected, Socket}, State) -> State1 = abort_pending_commands(State), State2 = State1#st{socket = Socket, connected_at = erlang:monotonic_time(millisecond), + backoff_send = false, status = init}, State3 = init_connection(State2), {noreply, State3, response_timeout(State3)}; @@ -498,6 +500,31 @@ setopts(#st{opts = #opts{transport = gen_tcp}, socket = Socket}, Opts) -> setopts(#st{opts = #opts{transport = ssl}, socket = Socket}, Opts) -> ssl:setopts(Socket, Opts). +getstat(#st{opts = #opts{transport = gen_tcp}, socket = Socket}, Opts) -> + inet:getstat(Socket, Opts); +getstat(#st{opts = #opts{transport = ssl}, socket = Socket}, Opts) -> + ssl:getstat(Socket, Opts). + +update_backoff_send(State) when State#st.backoff_send -> + case q_len(State#st.pending) of + 0 -> + %% No pending commands means nothing is waiting in the send buffer. + State#st{backoff_send = false}; + 1 -> + %% Pending queue almost empty, but maybe it was a huge command. + case getstat(State, [send_pend]) of + {ok, [{send_pend, SendPend}]} when SendPend < 1000 -> + State#st{backoff_send = false}; + _Otherwise -> + State + end; + _ -> + %% There are still multiple pending commands. Don't send more yet. + State + end; +update_backoff_send(State) -> + State. + %% Data received from the server handle_data(Data, #st{parser_state = ParserState} = State) -> handle_parser_result(ered_parser:continue(Data, ParserState), State). @@ -506,7 +533,8 @@ handle_parser_result({need_more, _BytesNeeded, ParserState}, State) -> State#st{parser_state = ParserState}; handle_parser_result({done, Value, ParserState}, State0) -> State1 = handle_result(Value, State0), - handle_parser_result(ered_parser:next(ParserState), State1); + State2 = update_backoff_send(State1), + handle_parser_result(ered_parser:next(ParserState), State2); handle_parser_result({parse_error, Reason}, State) -> Transport = State#st.opts#opts.transport, Transport:close(State#st.socket), @@ -639,7 +667,8 @@ abort_pending_commands(State) -> State#st{waiting = q_join(q_from_list(PendingReqs), State#st.waiting), pending = q_new(), parser_state = ered_parser:init(), - filling_batch = true}. + filling_batch = true, + backoff_send = false}. connection_down(Reason, State) -> State1 = abort_pending_commands(State), @@ -657,7 +686,7 @@ process_commands(State) -> if State#st.status =:= up, State#st.socket =/= none, - NumWaiting > 0, State#st.filling_batch -> + NumWaiting > 0, State#st.filling_batch, not State#st.backoff_send -> %% TODO: Add request timeout timestamp to PendingReq. {CommandQueue, NewWaiting} = q_split(min(BatchSize, NumWaiting), State#st.waiting), {BatchedData, PendingRequests} = @@ -673,13 +702,17 @@ process_commands(State) -> {[], q_new()}, q_to_list(CommandQueue)), Transport = State#st.opts#opts.transport, + NewPending = q_join(State#st.pending, PendingRequests), + StateAfterSend = State#st{waiting = NewWaiting, + pending = NewPending, + filling_batch = q_len(NewPending) < State#st.opts#opts.max_pending}, case Transport:send(State#st.socket, BatchedData) of ok -> - NewPending = q_join(State#st.pending, PendingRequests), - NewState = State#st{waiting = NewWaiting, - pending = NewPending, - filling_batch = q_len(NewPending) < State#st.opts#opts.max_pending}, - process_commands(NewState); + process_commands(StateAfterSend); + {error, timeout} -> + %% The send succeeded, but we should back off before sending + %% more. The data is queued in inet's send buffer. + process_commands(StateAfterSend#st{backoff_send = true}); {error, _Reason} -> %% Send FIN and handle replies in fligh before reconnecting. Transport:shutdown(State#st.socket, read_write), @@ -834,7 +867,7 @@ connect_loop(now, OwnerPid, #opts{host = Host, port = Port, transport = Transport, transport_opts = TransportOpts0, connect_timeout = Timeout} = Opts) -> - TransportOpts = [{active, 100}, binary] ++ TransportOpts0, + TransportOpts = [{send_timeout, 0}, {active, 100}, binary] ++ TransportOpts0, case Transport:connect(Host, Port, TransportOpts, Timeout) of {ok, Socket} -> case Transport:controlling_process(Socket, OwnerPid) of diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index fbed66c..6ff824b 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -16,7 +16,9 @@ run_test_() -> {spawn, fun bad_connection_option_t/0}, {spawn, fun server_buffer_full_reconnect_t/0}, {spawn, fun server_buffer_full_node_goes_down_t/0}, - {spawn, fun send_timeout_t/0}, + {spawn, fun response_timeout_t/0}, + {spawn, fun send_backoff_tcp_t/0}, + {spawn, fun send_backoff_tls_t/0}, {spawn, fun fail_hello_t/0}, {spawn, fun hello_with_auth_t/0}, {spawn, fun hello_with_auth_fail_t/0}, @@ -278,7 +280,7 @@ bad_connection_option_t() -> [{info_pid, self()}, {connection_opts, [bad_option]}])). -send_timeout_t() -> +response_timeout_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), spawn_link(fun() -> @@ -301,6 +303,68 @@ send_timeout_t() -> {reply, {ok, <<"pong">>}} = get_msg(), no_more_msgs(). +send_backoff_tcp_t() -> + send_backoff_t(gen_tcp). + +send_backoff_tls_t() -> + %% ssl with {send_timeout, 0} closes the socket on timeout in + %% OTP 28.0 .. 28.4 (ssl 11.2.0 .. 11.5.2). It was intentionally + %% broken and fixed in ssl-11.5.3 (OTP-20018). + %% It works in OTP 27 and earlier. + application:load(ssl), + {ok, SslVsn} = application:get_key(ssl, vsn), + case vsn_ge(SslVsn, "11.2.0") andalso not vsn_ge(SslVsn, "11.5.3") of + true -> ok; %% skip + false -> send_backoff_t(ssl) + end. + +send_backoff_t(Transport) -> + %% Send a large command N times. + N = 10, + + %% Construct a large binary. + Size = 1000 * 1000, + LargeBinary = binary:copy(<<"a">>, Size), + LargeCommand = [<<"SET">>, <<"foo">>, LargeBinary], + Resp = ered_command:get_data( + ered_command:convert_to([<<"SET">>, <<"foo">>, LargeBinary])), + RespLen = byte_size(Resp), + + %% Start server + {ListenSock, Port, ConnOpts} = listen(Transport), + ServerPid = + spawn_link(fun() -> + Sock = accept(Transport, ListenSock), + receive continue -> ok end, + [begin + {ok, <<"*3\r\n$3\r\nSET\r", _/binary>>} = Transport:recv(Sock, RespLen), + ok = Transport:send(Sock, <<"+OK\r\n">>) + end || _ <- lists:seq(1, N)], + {ok, <<"*1\r\n$4\r\nping\r\n">>} = Transport:recv(Sock, 0), + ok = Transport:send(Sock, <<"+PONG\r\n">>), + receive ok -> ok end + end), + Client = start_client(Port, [{connection_opts, ConnOpts ++ [{batch_size, 1}]}]), + expect_connection_up(Client), + Pid = self(), + %% Send the large command N times. In some cases, gen_tcp:send returns + %% {error, timeout} after a few times, even if the data is really large. + [ered_client:command_async(Client, LargeCommand, fun(Reply) -> Pid ! {reply, Reply} end) + || _ <- lists:seq(1, N)], + + #{backoff_send := BackoffSend, + pending := {NumPending, _}, + waiting := {NumWaiting, _}} = ered_client:state_to_map(sys:get_state(Client)), + ?assert(BackoffSend), + ?assert(NumPending > 0), + ?assert(NumWaiting > 0), + ?assertEqual(N, NumWaiting + NumPending), + ServerPid ! continue, + [{reply, {ok, <<"OK">>}} = get_msg() || _ <- lists:seq(1, N)], + ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {reply, Reply} end), + {reply, {ok, <<"PONG">>}} = get_msg(), + no_more_msgs(). + fail_hello_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), @@ -469,3 +533,30 @@ start_client(Port, Opt) -> timeout_error() -> error({timeout, erlang:process_info(self(), messages)}). +listen(gen_tcp) -> + {ok, LSock} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, Port} = inet:port(LSock), + {LSock, Port, [{tcp_options, []}]}; +listen(ssl) -> + ssl:start(), + CertFile = "/tmp/ered_test_cert.pem", + KeyFile = "/tmp/ered_test_key.pem", + os:cmd("openssl req -x509 -newkey rsa:2048 -keyout " ++ KeyFile ++ + " -out " ++ CertFile ++ " -days 1 -nodes -subj '/CN=localhost' 2>/dev/null"), + {ok, LSock} = ssl:listen(0, [binary, {active, false}, + {certfile, CertFile}, {keyfile, KeyFile}]), + {ok, {_, Port}} = ssl:sockname(LSock), + {LSock, Port, [{tls_options, [{verify, verify_none}]}]}. + +accept(gen_tcp, LSock) -> + {ok, Sock} = gen_tcp:accept(LSock), + Sock; +accept(ssl, LSock) -> + {ok, TSock} = ssl:transport_accept(LSock), + {ok, Sock} = ssl:handshake(TSock), + Sock. + + +vsn_ge(Vsn1, Vsn2) -> + lists:map(fun list_to_integer/1, string:tokens(Vsn1, ".")) >= + lists:map(fun list_to_integer/1, string:tokens(Vsn2, ".")). From 68325b62b653b4a520a45388650b97e086097aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 13 Apr 2026 11:43:11 +0200 Subject: [PATCH 4/4] Track sent_at timestamp per pending request (#167) Store erlang:monotonic_time(millisecond) in each pending_req when commands are sent to the server. Use the oldest pending request's timestamp to compute the remaining response timeout, so that time already spent waiting is accounted for. --- src/ered_client.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/ered_client.erl b/src/ered_client.erl index 3f17372..4045482 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -183,6 +183,7 @@ command :: #command{}, response_class :: ered_command:response_class() | [ered_command:response_class()], + sent_at :: integer(), % erlang:monotonic_time(millisecond) reply_acc = [] }). @@ -687,8 +688,8 @@ process_commands(State) -> if State#st.status =:= up, State#st.socket =/= none, NumWaiting > 0, State#st.filling_batch, not State#st.backoff_send -> - %% TODO: Add request timeout timestamp to PendingReq. {CommandQueue, NewWaiting} = q_split(min(BatchSize, NumWaiting), State#st.waiting), + Now = erlang:monotonic_time(millisecond), {BatchedData, PendingRequests} = lists:foldr(fun(Command, {DataAcc, PendingAcc}) -> RespCommand = Command#command.data, @@ -696,7 +697,8 @@ process_commands(State) -> NewBatchedData = ered_command:get_data(RespCommand), NewPendingRequest = #pending_req{command = Command, - response_class = ResponseClass}, + response_class = ResponseClass, + sent_at = Now}, {[NewBatchedData | DataAcc] , q_in_r(NewPendingRequest, PendingAcc)} end, {[], q_new()}, @@ -782,6 +784,9 @@ q_out({Size, Q}) -> {{value, Val}, NewQ} -> {Val, {Size-1, NewQ}} end. +q_get({_Size, Q}) -> + queue:get(Q). + q_split(N, {Size, Q}) when N =< Size -> {A, B} = queue:split(N, Q), {{N, A}, {Size - N, B}}. @@ -796,8 +801,9 @@ q_len({Size, _Q}) -> Size. response_timeout(State) when not ?q_is_empty(State#st.pending) -> - %% FIXME: Store req timeout in each pending item - State#st.opts#opts.timeout; + #pending_req{sent_at = SentAt} = q_get(State#st.pending), + Elapsed = erlang:monotonic_time(millisecond) - SentAt, + max(0, State#st.opts#opts.timeout - Elapsed); response_timeout(_State) -> infinity. @@ -915,7 +921,8 @@ init_connection(State) -> Data = ered_command:get_data(RespCommand), Command = #command{data = RespCommand, replyto = ReplyFun}, Class = ered_command:get_response_class(RespCommand), - PendingReq = #pending_req{command = Command, response_class = Class}, + PendingReq = #pending_req{command = Command, response_class = Class, + sent_at = erlang:monotonic_time(millisecond)}, Transport = State#st.opts#opts.transport, case Transport:send(State#st.socket, Data) of ok ->