diff --git a/README.md b/README.md index 90740c8..53ef8f9 100644 --- a/README.md +++ b/README.md @@ -271,16 +271,16 @@ options are wrapped in `{client_opts, [...]}` and included in cluster options. Options passed to the connection module. See [Connection options](#connection-options) below. -* `{max_waiting, non_neg_integer()}` +* `{max_waiting, pos_integer()}` Max number of commands allowed to wait in queue. Default 5000. -* `{max_pending, non_neg_integer()}` +* `{max_pending, pos_integer()}` Max number of commands to be pending, i.e. sent to client and waiting for a response. Default 128. -* `{queue_ok_level, non_neg_integer()}` +* `{queue_ok_level, pos_integer()}` If the queue has been full then it is considered ok again when it reaches this level. Default 2000. @@ -331,7 +331,7 @@ wrapped in `{connection_opts, [ered_connection:opt()]}`. For `ered_cluster:connect/2`, the connection options are included under client options, as `{client_opts, [{connection_opts, [...]}]}`. -* `{batch_size, non_neg_integer()}` +* `{batch_size, pos_integer()}` If commands are queued up in the process message queue, this is the maximum number of messages that will be received and sent in one call. Default 16. diff --git a/src/ered_client.erl b/src/ered_client.erl index cfdce31..4045482 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -15,6 +15,9 @@ command/2, command/3, command_async/3]). +%% testing/debugging +-export([state_to_map/1]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -34,7 +37,15 @@ { host :: host(), port :: inet:port_number(), - connection_opts = [] :: [ered_connection:opt()], + + %% From "connection opts" + batch_size = 16 :: pos_integer(), + transport = gen_tcp :: gen_tcp | ssl, + transport_opts = [] :: list(), + connect_timeout = infinity :: timeout(), + push_cb = fun(_) -> ok end :: push_cb(), + timeout = 10000 :: timeout(), % Response timeout + resp_version = 3 :: 2..3, use_cluster_id = false :: boolean(), auth = none :: {binary(), binary()} | none, @@ -45,36 +56,44 @@ info_pid = none :: none | pid(), queue_ok_level = 2000 :: non_neg_integer(), - max_waiting = 5000 :: non_neg_integer(), - max_pending = 128 :: non_neg_integer() + max_waiting = 5000 :: pos_integer(), + max_pending = 128 :: pos_integer() }). -record(st, { - connect_loop_pid = none, - connection_pid = none, + connection_loop_pid = none, + socket = none, controlling_process :: pid(), last_status = none, + parser_state :: ered_parser:state(), waiting = q_new() :: command_queue(), pending = q_new() :: command_queue(), + backoff_send = false :: boolean(), % true after a send timeout + %% Batching. When pending queue is full, + %% set we don't send more until another + %% complete batch can fit in the pending queue. + filling_batch = true :: boolean(), + cluster_id = undefined :: undefined | binary(), queue_full_event_sent = false :: boolean(), % set to true when full, false when reaching queue_ok_level - node_status = up :: up | node_down | node_deactivated, - + status :: init | up | node_down | node_deactivated, node_down_timer = none :: none | reference(), + connected_at = none :: none | integer(), % erlang:monotonic_time(millisecond) opts = #opts{} }). - -type command_error() :: queue_overflow | node_down | node_deactivated | {client_stopped, reason()}. -type command_item() :: {command, ered_command:redis_command(), reply_fun()}. -type command_queue() :: {Size :: non_neg_integer(), queue:queue(command_item())}. --type reply() :: {ok, ered_connection:result()} | {error, command_error()}. +-type result() :: ered_parser:parse_result(). +-type push_cb() :: fun((result()) -> any()). +-type reply() :: {ok, result()} | {error, command_error()}. -type reply_fun() :: fun((reply()) -> any()). -type host() :: ered:host(). @@ -106,12 +125,12 @@ -type opt() :: %% Options passed to the connection module - {connection_opts, [ered_connection:opt()]} | + {connection_opts, [connection_opt()]} | %% Max number of commands allowed to wait in queue. - {max_waiting, non_neg_integer()} | + {max_waiting, pos_integer()} | %% Max number of commands to be pending, i.e. sent to client %% and waiting for a response. - {max_pending, non_neg_integer()} | + {max_pending, pos_integer()} | %% If the queue has been full then it is considered ok %% again when it reaches this level {queue_ok_level, non_neg_integer()} | @@ -135,6 +154,46 @@ %% The SELECT command is only sent when non-zero. {select_db, non_neg_integer()}. +-type connection_opt() :: + %% If commands are queued up in the process message queue this is the max + %% amount of messages that will be received and sent in one call + {batch_size, pos_integer()} | + %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. + {connect_timeout, timeout()} | + %% Options passed to gen_tcp:connect/4. + {tcp_options, [gen_tcp:connect_option()]} | + %% Timeout passed to gen_tcp:connect/4. DEPRECATED. + {tcp_connect_timeout, timeout()} | + %% Options passed to ssl:connect/4. If this config parameter is present, + %% TLS is used. + {tls_options, [ssl:tls_client_option()]} | + %% Timeout passed to ssl:connect/4. DEPRECATED. + {tls_connect_timeout, timeout()} | + %% Callback for push notifications + {push_cb, push_cb()} | + %% Timeout when waiting for a response from Redis. milliseconds + {response_timeout, non_neg_integer()}. + +%% Command in the waiting queue. +-record(command, {data, replyto}). + +%% Pending request, in flight, sent to server, waiting for reply/ies. +-record(pending_req, + { + command :: #command{}, + response_class :: ered_command:response_class() | + [ered_command:response_class()], + sent_at :: integer(), % erlang:monotonic_time(millisecond) + reply_acc = [] + }). + +%% Queue macro, can be used in guards. +-define(q_is_empty(Q), (element(1, Q) =:= 0)). + +%% Commands like SUBSCRIBE and UNSUBSCRIBE don't return anything, so we use this +%% return value. +-define(pubsub_reply, undefined). + %%%=================================================================== %%% API %%%=================================================================== @@ -220,14 +279,22 @@ command(ServerRef, Command, Timeout) -> %% client process and should not hang or perform any lengthy task. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, CallbackFun) -> - gen_server:cast(ServerRef, {command, ered_command:convert_to(Command), CallbackFun}). + gen_server:cast(ServerRef, #command{data = ered_command:convert_to(Command), + replyto = CallbackFun}). + +%% Converts a state record to a map, for easier testing. +%% Used in tests, after calling sys:get_state(EredClientPid). +state_to_map(#st{} = State) -> + Fields = record_info(fields, st), + [st | Values] = tuple_to_list(State), + maps:from_list(lists:zip(Fields, Values)). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== init({Host, Port, OptsList, User}) -> Opts = lists:foldl( - fun({connection_opts, Val}, S) -> S#opts{connection_opts = Val}; + fun({connection_opts, Val}, S) -> handle_connection_opts(S, Val); ({max_waiting, Val}, S) -> S#opts{max_waiting = Val}; ({max_pending, Val}, S) -> S#opts{max_pending = Val}; ({queue_ok_level, Val}, S) -> S#opts{queue_ok_level = Val}; @@ -244,21 +311,53 @@ init({Host, Port, OptsList, User}) -> OptsList), monitor(process, User), process_flag(trap_exit, true), - Pid = self(), - ConnectPid = spawn_link(fun() -> connect(Pid, Opts) end), - {ok, start_node_down_timer(#st{opts = Opts, - controlling_process = User, - connect_loop_pid = ConnectPid})}. + State0 = #st{opts = Opts, + controlling_process = User, + parser_state = ered_parser:init(), + status = init}, + State1 = start_connect_loop(now, State0), + {ok, start_node_down_timer(State1)}. + +%% "Connection opts" is second layer of options. +%% +%% TODO: Remove this layering and put them directly as client options. It's an +%% API change so we'll do it in an appropriate version. +handle_connection_opts(OptsRecord, Opts) -> + Valid = [batch_size, tcp_options, tls_options, push_cb, response_timeout, + tcp_connect_timeout, tls_connect_timeout, connect_timeout], + [error({badarg, BadOpt}) + || BadOpt <- proplists:get_keys(Opts) -- Valid], + BatchSize = proplists:get_value(batch_size, Opts, 16), + ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), + PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), + TcpOptions = proplists:get_value(tcp_options, Opts, []), + TlsOptions = proplists:get_value(tls_options, Opts, []), + TcpTimeout = proplists:get_value(tcp_connect_timeout, Opts, infinity), + TlsTimeout = proplists:get_value(tls_connect_timeout, Opts, infinity), + {Transport, Options, Timeout0} = case TlsOptions of + [] -> + {gen_tcp, TcpOptions, TcpTimeout}; + _ -> + {ssl, TlsOptions, TlsTimeout} + end, + ConnectTimeout = proplists:get_value(connect_timeout, Opts, Timeout0), + OptsRecord#opts{batch_size = BatchSize, + transport = Transport, transport_opts = Options, + connect_timeout = ConnectTimeout, + timeout = ResponseTimeout, + push_cb = PushCb}. handle_call({command, Command}, From, State) -> Fun = fun(Reply) -> gen_server:reply(From, Reply) end, - handle_cast({command, Command, Fun}, State). + handle_cast(#command{data = Command, replyto = Fun}, State). -handle_cast(Command = {command, _, _}, State) -> - case State#st.node_status of - up -> - {noreply, process_commands(State#st{waiting = q_in(Command, State#st.waiting)})}; +handle_cast(Command = #command{}, State) -> + case State#st.status of + Up when Up =:= up; Up =:= init -> + State1 = State#st{waiting = q_in(Command, State#st.waiting)}, + State2 = process_commands(State1), + {noreply, State2, response_timeout(State2)}; NodeProblem when NodeProblem =:= node_down; NodeProblem =:= node_deactivated -> reply_command(Command, {error, NodeProblem}), {noreply, State} @@ -268,66 +367,124 @@ handle_cast(deactivate, State) -> State1 = cancel_node_down_timer(State), State2 = report_connection_status(node_deactivated, State1), State3 = reply_all({error, node_deactivated}, State2), - {noreply, process_commands(State3#st{node_status = node_deactivated})}; + {noreply, process_commands(State3#st{status = node_deactivated})}; -handle_cast(reactivate, #st{connection_pid = none} = State) -> +handle_cast(reactivate, #st{socket = none} = State) -> {noreply, start_node_down_timer(State)}; handle_cast(reactivate, State) -> - {noreply, State#st{node_status = up}}. - + {noreply, State#st{status = up}}. -handle_info({{command_reply, Pid}, Reply}, State = #st{pending = Pending, connection_pid = Pid}) -> - case q_out(Pending) of - empty -> - {noreply, State}; - {Command, NewPending} -> - reply_command(Command, {ok, Reply}), - {noreply, process_commands(State#st{pending = NewPending})} +handle_info({Type, Socket, Data}, #st{socket = Socket} = State) + when Type =:= tcp; Type =:= ssl -> + %% Receive data from current socket. + State1 = handle_data(Data, State), + State2 = process_commands(State1), + {noreply, State2, response_timeout(State2)}; + +handle_info({Passive, Socket}, #st{socket = Socket} = State) + when Passive =:= tcp_passive; Passive =:= ssl_passive -> + %% Socket switched to passive mode due to {active, N}. + %% TODO: Add config for N. + N = 100, + case setopts(State, [{active, N}]) of + ok -> + {noreply, State, response_timeout(State)}; + {error, Reason} -> + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, Reason}, State#st{socket = none})} end; -handle_info({{command_reply, _Pid}, _Reply}, State) -> - %% Stray message from a defunct client? ignore! - {noreply, State}; - -handle_info(Reason = {connect_error, _ErrorReason}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info(Reason = {init_error, _Errors}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info(Reason = {socket_closed, _CloseReason}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info({connected, Pid, ClusterId}, State) -> - State1 = cancel_node_down_timer(State), - State2 = State1#st{connection_pid = Pid, cluster_id = ClusterId, - node_status = case State1#st.node_status of - node_down -> up; - OldStatus -> OldStatus - end}, - State3 = report_connection_status(connection_up, State2), - {noreply, process_commands(State3)}; +handle_info({Error, Socket, Reason}, #st{socket = Socket} = State) + when Error =:= tcp_error; Error =:= ssl_error -> + %% Socket errors. If the network or peer is down, the error is not + %% always followed by a tcp_closed. + %% + %% TLS 1.3: Called after a connect when the client certificate has expired + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, Reason}, State#st{socket = none})}; + +handle_info({Closed, Socket}, #st{socket = Socket} = State) + when Closed =:= tcp_closed; Closed =:= ssl_closed -> + %% Socket got closed by the server. + {noreply, connection_down({socket_closed, Closed}, State#st{socket = none})}; + +handle_info(ConnectError = {connect_error, _Reason}, State) -> + %% Message from the connect loop process. It will retry. + {noreply, connection_down(ConnectError, State)}; + +handle_info({connected, Socket}, State) -> + %% Sent from connect loop process when just before it exits. + State1 = abort_pending_commands(State), + State2 = State1#st{socket = Socket, + connected_at = erlang:monotonic_time(millisecond), + backoff_send = false, + status = init}, + State3 = init_connection(State2), + {noreply, State3, response_timeout(State3)}; + +handle_info({init_command_reply, {ok, Replies}}, State) -> + case [Reason || {error, Reason} <- Replies] of + [] -> + %% No errors + ClusterId = case State#st.opts#opts.use_cluster_id of + true -> + hd(Replies); + false -> + undefined + end, + State1 = cancel_node_down_timer(State), + NodeStatus = case State1#st.status of + node_down -> up; + init -> up; + OldStatus -> OldStatus + end, + State2 = State1#st{status = NodeStatus, cluster_id = ClusterId}, + State3 = report_connection_status(connection_up, State2), + {noreply, process_commands(State3), response_timeout(State3)}; + Errors -> + {noreply, connection_down({init_error, Errors}, State)} + end; +handle_info({init_command_reply, {error, Reason}}, State) -> + {noreply, connection_down({init_error, Reason}, State)}; handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.node_down_timer -> + %% Node down timeout State1 = report_connection_status({connection_down, node_down_timeout}, State), State2 = reply_all({error, node_down}, State1), - {noreply, process_commands(State2#st{node_status = node_down})}; + {noreply, process_commands(State2#st{status = node_down})}; -handle_info({timeout, _TimerRef, _Msg}, State) -> - {noreply, State}; +handle_info(timeout, #st{socket = Socket} = State) when Socket =/= none -> + %% Request timeout + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, timeout}, State#st{socket = none})}; handle_info({'DOWN', _Mon, process, Pid, ExitReason}, State = #st{controlling_process = Pid}) -> {stop, ExitReason, State}; +handle_info({'EXIT', Pid, normal}, #st{connection_loop_pid = Pid} = State) -> + State1 = State#st{connection_loop_pid = none}, + State2 = case State1#st.socket of + none -> + %% Corner case. The new connection was lost before this + %% exit signal arrived. Start reconnect loop again. + start_connect_loop(now, State1); + _Socket -> + State1 + end, + {noreply, State2, response_timeout(State2)}; + handle_info({'EXIT', _From, Reason}, State) -> + %% Supervisor exited. {stop, Reason, State}; handle_info(_Ignore, State) -> - {noreply, State}. + {noreply, State, response_timeout(State)}. terminate(Reason, State) -> - exit(State#st.connect_loop_pid, kill), reply_all({error, {client_stopped, Reason}}, State), report_connection_status({connection_down, {client_stopped, Reason}}, State), ok. @@ -338,8 +495,154 @@ code_change(_OldVsn, State = #st{opts = #opts{}}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +setopts(#st{opts = #opts{transport = gen_tcp}, socket = Socket}, Opts) -> + inet:setopts(Socket, Opts); +setopts(#st{opts = #opts{transport = ssl}, socket = Socket}, Opts) -> + ssl:setopts(Socket, Opts). + +getstat(#st{opts = #opts{transport = gen_tcp}, socket = Socket}, Opts) -> + inet:getstat(Socket, Opts); +getstat(#st{opts = #opts{transport = ssl}, socket = Socket}, Opts) -> + ssl:getstat(Socket, Opts). + +update_backoff_send(State) when State#st.backoff_send -> + case q_len(State#st.pending) of + 0 -> + %% No pending commands means nothing is waiting in the send buffer. + State#st{backoff_send = false}; + 1 -> + %% Pending queue almost empty, but maybe it was a huge command. + case getstat(State, [send_pend]) of + {ok, [{send_pend, SendPend}]} when SendPend < 1000 -> + State#st{backoff_send = false}; + _Otherwise -> + State + end; + _ -> + %% There are still multiple pending commands. Don't send more yet. + State + end; +update_backoff_send(State) -> + State. + +%% Data received from the server +handle_data(Data, #st{parser_state = ParserState} = State) -> + handle_parser_result(ered_parser:continue(Data, ParserState), State). + +handle_parser_result({need_more, _BytesNeeded, ParserState}, State) -> + State#st{parser_state = ParserState}; +handle_parser_result({done, Value, ParserState}, State0) -> + State1 = handle_result(Value, State0), + State2 = update_backoff_send(State1), + handle_parser_result(ered_parser:next(ParserState), State2); +handle_parser_result({parse_error, Reason}, State) -> + Transport = State#st.opts#opts.transport, + Transport:close(State#st.socket), + connection_down({socket_closed, {parse_error, Reason}}, State#st{socket = none}). + +handle_result({push, Value = [Type|_]}, State) -> + %% Pub/sub in RESP3 is a bit quirky. The push is supposed to be out of band + %% data not connected to any request but for subscribe and unsubscribe + %% requests, a successful command is signalled as one or more push messages. + PushCB = State#st.opts#opts.push_cb, + PushCB(Value), + State1 = case is_subscribe_push(Type) of + true -> + handle_subscribe_push(Value, State); + false -> + State + end, + State1; +handle_result(Value, #st{pending = PendingQueue} = State) + when not ?q_is_empty(PendingQueue) -> + {PendingReq, PendingQueue1} = q_out(PendingQueue), + #pending_req{command = Command, + response_class = RespClass, + reply_acc = Acc} = PendingReq, + %% Check how many replies expected (list = pipeline) + case RespClass of + Single when not is_list(Single) -> + reply_command(Command, {ok, Value}), + State#st{pending = PendingQueue1}; + [_] -> + %% Last one, send the reply + reply_command(Command, {ok, lists:reverse([Value | Acc])}), + State#st{pending = PendingQueue1}; + [_ | TailClasses] -> + %% Need more replies. Save the reply and keep going. + PendingReq1 = PendingReq#pending_req{response_class = TailClasses, + reply_acc = [Value | Acc]}, + PendingQueue2 = q_in_r(PendingReq1, PendingQueue1), + State#st{pending = PendingQueue2} + end; +handle_result(_Value, #st{pending = PendingQueue}) + when ?q_is_empty(PendingQueue) -> + error(unexpected_reply). + +is_subscribe_push(<<"subscribe">>) -> + true; +is_subscribe_push(<>) when X >= $a, X =< $z -> + true; +is_subscribe_push(<<"unsubscribe">>) -> + true; +is_subscribe_push(<>) when X >= $a, X =< $z -> + true; +is_subscribe_push(_) -> + false. + +handle_subscribe_push(PushMessage, #st{pending = PendingQueue} = State) -> + case q_out(PendingQueue) of + {PendingReq, PendingQueue1} -> + State1 = State#st{pending = PendingQueue1}, + handle_subscribed_popped_pending(PushMessage, PendingReq, State1); + empty -> + %% No commands pending. It's may be a server initiated unsubscribe. + State + end. + +handle_subscribed_popped_pending(Push, + #pending_req{command = Command, + response_class = ExpectClass, + reply_acc = Acc} = Req, + State) -> + case {ExpectClass, hd(Push)} of + {{Type, N}, Type} % simple command + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + reply_command(Command, {ok, ?pubsub_reply}), + State; + {{Type, N}, Type} % simple command + when N > 1 -> % not yet subscribed all channels + Req1 = Req#pending_req{response_class = {Type, N - 1}}, + pending_in_r(Req1, State); + {[{Type, N}], Type} % last command in pipeline + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + reply_command(Command, {ok, lists:reverse([?pubsub_reply | Acc])}), + State; + {[{Type, N} | Classes], Type} % pipeline, not the last command + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + Req1 = Req#pending_req{response_class = Classes, + reply_acc = [?pubsub_reply | Acc]}, + pending_in_r(Req1, State); + {[{Type, N} | Classes], Type} % pipeline + when N > 1 -> % not yet subscribed all channels + Req1 = Req#pending_req{response_class = [{Type, N - 1} | Classes]}, + pending_in_r(Req1, State); + _Otherwise -> + %% Not expecting this particular push message for Req. Put it back in queue. + pending_in_r(Req, State) + end. + +%% Add in the front of the pending queue (like queue:in_r). +pending_in_r(ReplyInfo, #st{pending = Pending0} = State) -> + Pending1 = q_in_r(ReplyInfo, Pending0), + State#st{pending = Pending1}. + reply_all(Reply, State) -> - [reply_command(Command, Reply) || Command <- q_to_list(State#st.pending)], + [reply_command(Req#pending_req.command, Reply) || Req <- q_to_list(State#st.pending)], [reply_command(Command, Reply) || Command <- q_to_list(State#st.waiting)], State#st{waiting = q_new(), pending = q_new()}. @@ -355,41 +658,103 @@ cancel_node_down_timer(#st{node_down_timer = TimerRef} = State) -> erlang:cancel_timer(TimerRef), State#st{node_down_timer = none}. +%% Move pending commands back to the waiting queue. Discard partial replies. +%% +%% This is risky behavior. If some of the commands sent are not idempotent, we +%% can't just reconnect and send them again. We may just want to return an error +%% instead. +abort_pending_commands(State) -> + PendingReqs = [Req#pending_req.command || Req <- q_to_list(State#st.pending)], + State#st{waiting = q_join(q_from_list(PendingReqs), State#st.waiting), + pending = q_new(), + parser_state = ered_parser:init(), + filling_batch = true, + backoff_send = false}. + connection_down(Reason, State) -> - State1 = State#st{waiting = q_join(State#st.pending, State#st.waiting), - pending = q_new(), - connection_pid = none}, + State1 = abort_pending_commands(State), State2 = process_commands(State1), State3 = report_connection_status({connection_down, Reason}, State2), - start_node_down_timer(State3). - + State4 = start_connect_loop(now, State3), + start_node_down_timer(State4). -%%%%%% +-spec process_commands(#st{}) -> #st{}. process_commands(State) -> NumWaiting = q_len(State#st.waiting), NumPending = q_len(State#st.pending), + BatchSize = State#st.opts#opts.batch_size, + LowWaterMark = max(State#st.opts#opts.max_pending - BatchSize, 1), + if - (NumWaiting > 0) and (NumPending < State#st.opts#opts.max_pending) and (State#st.connection_pid /= none) -> - {Command, NewWaiting} = q_out(State#st.waiting), - Data = get_command_payload(Command), - ered_connection:command_async(State#st.connection_pid, Data, {command_reply, State#st.connection_pid}), - process_commands(State#st{pending = q_in(Command, State#st.pending), - waiting = NewWaiting}); - - (NumWaiting > State#st.opts#opts.max_waiting) and (State#st.queue_full_event_sent) -> + State#st.status =:= up, State#st.socket =/= none, + NumWaiting > 0, State#st.filling_batch, not State#st.backoff_send -> + {CommandQueue, NewWaiting} = q_split(min(BatchSize, NumWaiting), State#st.waiting), + Now = erlang:monotonic_time(millisecond), + {BatchedData, PendingRequests} = + lists:foldr(fun(Command, {DataAcc, PendingAcc}) -> + RespCommand = Command#command.data, + ResponseClass = ered_command:get_response_class(RespCommand), + + NewBatchedData = ered_command:get_data(RespCommand), + NewPendingRequest = #pending_req{command = Command, + response_class = ResponseClass, + sent_at = Now}, + {[NewBatchedData | DataAcc] , q_in_r(NewPendingRequest, PendingAcc)} + end, + {[], q_new()}, + q_to_list(CommandQueue)), + Transport = State#st.opts#opts.transport, + NewPending = q_join(State#st.pending, PendingRequests), + StateAfterSend = State#st{waiting = NewWaiting, + pending = NewPending, + filling_batch = q_len(NewPending) < State#st.opts#opts.max_pending}, + case Transport:send(State#st.socket, BatchedData) of + ok -> + process_commands(StateAfterSend); + {error, timeout} -> + %% The send succeeded, but we should back off before sending + %% more. The data is queued in inet's send buffer. + process_commands(StateAfterSend#st{backoff_send = true}); + {error, _Reason} -> + %% Send FIN and handle replies in fligh before reconnecting. + Transport:shutdown(State#st.socket, read_write), + start_connect_loop(now, State#st{status = init}) + end; + + not State#st.filling_batch, NumPending < LowWaterMark -> + process_commands(State#st{filling_batch = true}); + + NumWaiting > State#st.opts#opts.max_waiting, State#st.queue_full_event_sent -> drop_commands(State); NumWaiting > State#st.opts#opts.max_waiting -> drop_commands( report_connection_status(queue_full, State#st{queue_full_event_sent = true})); - (NumWaiting < State#st.opts#opts.queue_ok_level) and (State#st.queue_full_event_sent) -> + NumWaiting < State#st.opts#opts.queue_ok_level, State#st.queue_full_event_sent -> report_connection_status(queue_ok, State#st{queue_full_event_sent = false}); true -> State end. +start_connect_loop(_When, State) when is_pid(State#st.connection_loop_pid) -> + State; +start_connect_loop(When0, State) -> + Self = self(), + Now = erlang:monotonic_time(millisecond), + ConnectedAt = State#st.connected_at, + %% Don't reconnect immediately if the last connect was too recently. + When = if + is_integer(ConnectedAt), + Now - ConnectedAt < State#st.opts#opts.reconnect_wait -> + wait; + true -> + When0 + end, + ConnectPid = spawn_link(fun () -> connect_loop(When, Self, State#st.opts) end), + State#st{connection_loop_pid = ConnectPid}. + drop_commands(State) -> case q_len(State#st.waiting) > State#st.opts#opts.max_waiting of true -> @@ -407,6 +772,9 @@ q_new() -> q_in(Item, {Size, Q}) -> {Size+1, queue:in(Item, Q)}. +q_in_r(Item, {Size, Q}) -> + {Size + 1, queue:in_r(Item, Q)}. + q_join({Size1, Q1}, {Size2, Q2}) -> {Size1 + Size2, queue:join(Q1, Q2)}. @@ -416,22 +784,45 @@ q_out({Size, Q}) -> {{value, Val}, NewQ} -> {Val, {Size-1, NewQ}} end. +q_get({_Size, Q}) -> + queue:get(Q). + +q_split(N, {Size, Q}) when N =< Size -> + {A, B} = queue:split(N, Q), + {{N, A}, {Size - N, B}}. + q_to_list({_Size, Q}) -> queue:to_list(Q). +q_from_list(List) -> + {length(List), queue:from_list(List)}. + q_len({Size, _Q}) -> Size. +response_timeout(State) when not ?q_is_empty(State#st.pending) -> + #pending_req{sent_at = SentAt} = q_get(State#st.pending), + Elapsed = erlang:monotonic_time(millisecond) - SentAt, + max(0, State#st.opts#opts.timeout - Elapsed); +response_timeout(_State) -> + infinity. -reply_command({command, _, Fun}, Reply) -> +reply_command(#command{replyto = Fun} = _Command, Reply) -> Fun(Reply). -get_command_payload({command, Command, _Fun}) -> - Command. - -spec report_connection_status(status(), #st{}) -> #st{}. report_connection_status(Status, State = #st{last_status = Status}) -> State; +report_connection_status({connection_down, {init_error, node_down}}, + #st{last_status = {connection_down, _}} = State) -> + %% Silence additional init error cased by connection down. The lost + %% connection was already reported in another status message. + State; +report_connection_status({connection_down, {init_error, InitReason}}, + #st{last_status = node_deactivated} = State) + when InitReason =:= node_deactivated; InitReason =:= node_down -> + %% Silence additional init error when node is deactivated. + State; report_connection_status(Status, State) -> send_info(Status, State), case Status of @@ -474,32 +865,36 @@ send_info(Status, #st{opts = #opts{info_pid = Pid, send_info(_Msg, _State) -> ok. - -connect(Pid, Opts) -> - Result = ered_connection:connect(Opts#opts.host, Opts#opts.port, Opts#opts.connection_opts), - case Result of +%% Connect-wait-retry loop, to run in a separate spawned process. When +%% connected, transfers the socket to the OwnerPid, sends a message `{connected, +%% Socket}` and exits. On connect error, a message `{connect_error, Reason}` is +%% sent and connecting is retried periodically. +connect_loop(now, OwnerPid, + #opts{host = Host, port = Port, transport = Transport, + transport_opts = TransportOpts0, + connect_timeout = Timeout} = Opts) -> + TransportOpts = [{send_timeout, 0}, {active, 100}, binary] ++ TransportOpts0, + case Transport:connect(Host, Port, TransportOpts, Timeout) of + {ok, Socket} -> + case Transport:controlling_process(Socket, OwnerPid) of + ok -> + OwnerPid ! {connected, Socket}; + {error, Reason} -> + OwnerPid ! {connect_error, Reason}, + Transport:close(Socket), + connect_loop(wait, OwnerPid, Opts) + end; {error, Reason} -> - Pid ! {connect_error, Reason}, - timer:sleep(Opts#opts.reconnect_wait); - - {ok, ConnectionPid} -> - case init(Pid, ConnectionPid, Opts) of - {socket_closed, ConnectionPid, Reason} -> - Pid ! {socket_closed, Reason}, - timer:sleep(Opts#opts.reconnect_wait); - {ok, ClusterId} -> - Pid ! {connected, ConnectionPid, ClusterId}, - receive - {socket_closed, ConnectionPid, Reason} -> - Pid ! {socket_closed, Reason} - end - end - - end, - connect(Pid, Opts). - + OwnerPid ! {connect_error, Reason}, + connect_loop(wait, OwnerPid, Opts) + end; +connect_loop(wait, OwnerPid, Opts) -> + timer:sleep(Opts#opts.reconnect_wait), + connect_loop(now, OwnerPid, Opts). -init(MainPid, ConnectionPid, Opts) -> +init_connection(State) -> + #st{opts = #opts{transport = Transport} = Opts, + socket = Socket} = State, Cmd1 = [[<<"CLUSTER">>, <<"MYID">>] || Opts#opts.use_cluster_id], Cmd2 = case {Opts#opts.resp_version, Opts#opts.auth} of {3, {Username, Password}} -> @@ -515,22 +910,30 @@ init(MainPid, ConnectionPid, Opts) -> Opts#opts.select_db > 0], case Cmd1 ++ Cmd2 ++ Cmd3 of [] -> - {ok, undefined}; - Commands -> - ered_connection:command_async(ConnectionPid, Commands, init_command_reply), - receive - {init_command_reply, Reply} -> - case [Reason || {error, Reason} <- Reply] of - [] when Opts#opts.use_cluster_id -> - {ok, hd(Reply)}; - [] -> - {ok, undefined}; - Errors -> - MainPid ! {init_error, Errors}, - timer:sleep(Opts#opts.reconnect_wait), - init(MainPid, ConnectionPid, Opts) - end; - Other -> - Other + self() ! {init_command_reply, {ok, []}}, + State; + Pipeline -> + %% Add to pending queue and send like any other commands. + ReplyFun = fun (Reply) -> + self() ! {init_command_reply, Reply} + end, + RespCommand = ered_command:convert_to(Pipeline), + Data = ered_command:get_data(RespCommand), + Command = #command{data = RespCommand, replyto = ReplyFun}, + Class = ered_command:get_response_class(RespCommand), + PendingReq = #pending_req{command = Command, response_class = Class, + sent_at = erlang:monotonic_time(millisecond)}, + Transport = State#st.opts#opts.transport, + case Transport:send(State#st.socket, Data) of + ok -> + State1 = State#st{pending = q_in(PendingReq, State#st.pending), + status = init}, + %% Send commands immediately or wait for init reply first? + %% process_commands(State1); + State1; + {error, _Reason} -> + %% Send FIN and handle replies in flight before reconnecting. + Transport:shutdown(Socket, read_write), + start_connect_loop(wait, State) end end. diff --git a/src/ered_connection.erl b/src/ered_connection.erl deleted file mode 100644 index 13ebba1..0000000 --- a/src/ered_connection.erl +++ /dev/null @@ -1,384 +0,0 @@ --module(ered_connection). - -%% Managing the socket, sending commands and receiving replies. -%% Batches messages from the process queue. One process handles -%% writing to the socket and one handles the reading and decoding. -%% After a command is sent in the sending process a message is sent to -%% the reading process informing it about how many replies to expect -%% and who expects the result. No reconnection handling, if there is -%% an error the processes will exit. - --export([connect/2, - connect/3, - connect_async/3, - command/2, command/3, - command_async/3]). - --export_type([opt/0, - result/0, - host/0]). - - -%%%=================================================================== -%%% Definitions -%%%=================================================================== --record(recv_st, {transport :: gen_tcp | ssl, - socket :: gen_tcp:socket() | ssl:sslsocket(), - push_cb :: push_cb(), - timeout :: non_neg_integer(), % milliseconds - waiting = [] :: [wait_info()], - waiting_since :: undefined | integer() % erlang:monotonic_time(millisecond) - }). - --type opt() :: - %% If commands are queued up in the process message queue this is the max - %% amount of messages that will be received and sent in one call - {batch_size, non_neg_integer()} | - %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. - {connect_timeout, timeout()} | - %% Options passed to gen_tcp:connect/4. - {tcp_options, [gen_tcp:connect_option()]} | - %% Timeout passed to gen_tcp:connect/4. DEPRECATED. - {tcp_connect_timeout, timeout()} | - %% Options passed to ssl:connect/4. If this config parameter is present, - %% TLS is used. - {tls_options, [ssl:tls_client_option()]} | - %% Timeout passed to ssl:connect/4. DEPRECATED. - {tls_connect_timeout, timeout()} | - %% Callback for push notifications - {push_cb, push_cb()} | - %% Timeout when waiting for a response from Redis. milliseconds - {response_timeout, non_neg_integer()}. - --type result() :: ered_parser:parse_result(). --type push_cb() :: fun((result()) -> any()). --type wait_info() :: - {ered_command:response_class() | [ered_command:response_class()], - pid(), - Ref :: any(), - Acc :: [result()]}. % Acc used to store partial pipeline results --type host() :: ered:host(). --type connect_result() :: {ok, connection_ref()} | {error, timeout | inet:posix()}. --type connection_ref() :: pid(). - -%% Commands like SUBSCRIBE and UNSUBSCRIBE don't return anything, so we use this -%% return value. --define(pubsub_reply, undefined). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec connect(host(), inet:port_number()) -> connect_result(). --spec connect(host(), inet:port_number(), [opt()]) -> connect_result(). -%% -%% Connect to Redis node. Start send and receive process. -%% When the connection is closed a socket_closed message will be sent. -%% to the calling process. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -connect(Host, Port) -> - connect(Host, Port, []). - -connect(Host, Port, Opts) -> - Pid = connect_async(Host, Port, Opts), - receive - {connected, Pid} -> - {ok, Pid}; - {connect_error, Pid, Reason} -> - {error, Reason} - end. - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec connect_async(host(), inet:port_number(), [opt()]) -> connection_ref(). -%% -%% Connect to Redis node. Start send and receive process. -%% The function will return before connect is completed and a connected or -%% connect_error message will be sent to the calling process. -%% When the connection is closed a socket_closed message will be sent. -%% to the calling process. -%% -%% Deprecated options: -%% tcp_connect_timeout - replaced by connect_timeout. -%% tls_connect_timeout - replaced by connect_timeout. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -connect_async(Addr, Port, Opts) -> - [error({badarg, BadOpt}) - || BadOpt <- proplists:get_keys(Opts) -- [batch_size, tcp_options, tls_options, push_cb, response_timeout, - tcp_connect_timeout, tls_connect_timeout, connect_timeout]], - BatchSize = proplists:get_value(batch_size, Opts, 16), - ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), - PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), - TcpOptions = proplists:get_value(tcp_options, Opts, []), - TlsOptions = proplists:get_value(tls_options, Opts, []), - TcpTimeout = proplists:get_value(tcp_connect_timeout, Opts, infinity), - TlsTimeout = proplists:get_value(tls_connect_timeout, Opts, infinity), - {Transport, Options, Timeout0} = case TlsOptions of - [] -> - {gen_tcp, TcpOptions, TcpTimeout}; - _ -> - {ssl, TlsOptions, TlsTimeout} - end, - Timeout = proplists:get_value(connect_timeout, Opts, Timeout0), - Master = self(), - spawn_link( - fun() -> - SendPid = self(), - case catch Transport:connect(Addr, Port, [{active, false}, binary] ++ Options, Timeout) of - {ok, Socket} -> - Master ! {connected, SendPid}, - Pid = spawn_link(fun() -> - ExitReason = recv_loop(Transport, Socket, PushCb, ResponseTimeout), - %% Inform sending process about exit - SendPid ! ExitReason - end), - ExitReason = send_loop(Transport, Socket, Pid, BatchSize), - Master ! {socket_closed, SendPid, ExitReason}; - {error, Reason} -> - Master ! {connect_error, SendPid, Reason}; - Other -> % {'EXIT',_} - Master ! {connect_error, SendPid, Other} - end - end). - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec command(connection_ref(), ered_command:command()) -> result(). --spec command(connection_ref(), ered_command:command(), timeout()) -> result(). -%% -%% Send a command to the connected Redis node. The argument can be a -%% single command as a list of binaries, a pipeline of command as a -%% list of commands or a formatted redis_command. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -command(Connection, Command) -> - command(Connection, Command, 10000). - -command(Connection, Command, Timeout) -> - link(Connection), - Ref = make_ref(), - Connection ! {send, self(), Ref, ered_command:convert_to(Command)}, - receive {Ref, Value} -> - unlink(Connection), - Value - after Timeout -> - unlink(Connection), - {error, timeout} - end. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec command_async(connection_ref(), ered_command:command(), any()) -> ok. -%% -%% Send a command to the connected Redis node in asynchronous -%% fashion. The provided callback function will be called with the -%% reply. Note that the callback function will executing in the redis -%% client process and should not hang or perform any lengthy task. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -command_async(Connection, Data, Ref) -> - Connection ! {send, self(), Ref, ered_command:convert_to(Data)}, - ok. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% -%% Receive logic -%% - -recv_loop(Transport, Socket, PushCB, Timeout) -> - ParseInit = ered_parser:next(ered_parser:init()), - State = #recv_st{transport = Transport, socket = Socket, push_cb = PushCB, timeout = Timeout}, - try - recv_loop({ParseInit, State}) - catch - %% handle done, parse error, recv error - throw:Reason -> - {recv_exit, Reason} - end. - -recv_loop({ParseResult, State}) -> - Next = case ParseResult of - {need_more, BytesNeeded, ParserState} -> - read_socket(BytesNeeded, ParserState, State); - {done, Value, ParserState} -> - handle_result(Value, ParserState, State) - end, - recv_loop(Next). - -read_socket(BytesNeeded, ParserState, State) -> - State1 = update_waiting(0, State), - WaitTime = get_timeout(State1), - Transport = State1#recv_st.transport, - case Transport:recv(State1#recv_st.socket, BytesNeeded, WaitTime) of - {ok, Data} -> - {ered_parser:continue(Data, ParserState), State1}; - {error, timeout} when State1#recv_st.waiting == [] -> - %% no command pending, try again - read_socket(BytesNeeded, ParserState, State1); - {error, Reason} -> - throw(Reason) - end. - -handle_result({push, Value = [Type|_]}, ParserState, State) -> - %% Pub/sub in RESP3 is a bit quirky. The push is supposed to be out of bound - %% data not connected to any request but for subscribe and unsubscribe - %% requests, a successful command is signalled as one or more push messages. - PushCB = State#recv_st.push_cb, - PushCB(Value), - State1 = case is_subscribe_push(Type) of - true -> - handle_subscribe_push(Value, State); - false -> - State - end, - {ered_parser:next(ParserState), State1}; -handle_result(Value, ParserState, State) -> - {{RespClass, Pid, Ref, Acc}, State1} = pop_waiting(State), - %% Check how many replies expected (list = pipeline) - case RespClass of - Single when not is_list(Single) -> - Pid ! {Ref, Value}, - {ered_parser:next(ParserState), State1}; - [_] -> - %% Last one, send the reply - Pid ! {Ref, lists:reverse([Value | Acc])}, - {ered_parser:next(ParserState), State1}; - [_ | RespClasses] -> - %% More left, save the reply and keep going - State2 = push_waiting({RespClasses, Pid, Ref, [Value | Acc]}, State1), - {ered_parser:next(ParserState), State2} - end. - -is_subscribe_push(<<"subscribe">>) -> - true; -is_subscribe_push(<>) when X >= $a, X =< $z -> - true; -is_subscribe_push(<<"unsubscribe">>) -> - true; -is_subscribe_push(<>) when X >= $a, X =< $z -> - true; -is_subscribe_push(_) -> - false. - -handle_subscribe_push(PushMessage, State) -> - case try_pop_waiting(State) of - {PoppedWaiting, State1} -> - handle_subscribed_popped_waiting(PushMessage, PoppedWaiting, State1); - none -> - %% No commands pending. - State - end. - -handle_subscribed_popped_waiting(Push, Waiting = {ExpectClass, Pid, Ref, Acc}, State) -> - case {ExpectClass, hd(Push)} of - {{Type, N}, Type} % simple command - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - Pid ! {Ref, ?pubsub_reply}, - State; - {{Type, N}, Type} % simple command - when N > 1 -> % not yet subscribed all channels - push_waiting({{Type, N - 1}, Pid, Ref, Acc}, State); - {[{Type, N}], Type} % last command in pipeline - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - Pid ! {Ref, lists:reverse([?pubsub_reply | Acc])}, - State; - {[{Type, N} | Classes], Type} % pipeline, not the last command - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - push_waiting({Classes, Pid, Ref, [?pubsub_reply | Acc]}, State); - {[{Type, N} | Classes], Type} % pipeline - when N > 1 -> % not yet subscribed all channels - push_waiting({[{Type, N - 1} | Classes], Pid, Ref, Acc}, State); - _Otherwise -> - %% Not waiting for this particular push message. - push_waiting(Waiting, State) - end. - -get_timeout(State) -> - case State#recv_st.waiting_since of - undefined -> - State#recv_st.timeout; - Since -> - case State#recv_st.timeout - (erlang:monotonic_time(millisecond) - Since) of - T when T < 0 -> 0; - T -> T - end - end. - -pop_waiting(State) -> - State1 = update_waiting(infinity, State), - [WaitInfo | Rest] = State1#recv_st.waiting, - {WaitInfo, State1#recv_st{waiting = Rest}}. - -try_pop_waiting(State) -> - State1 = update_waiting(0, State), - case State1#recv_st.waiting of - [WaitInfo | Rest] -> - {WaitInfo, State1#recv_st{waiting = Rest}}; - [] -> - none - end. - -push_waiting(WaitInfo,State) -> - State#recv_st{waiting = [WaitInfo | State#recv_st.waiting]}. - -update_waiting(Timeout, State) when State#recv_st.waiting == [] -> - case receive Msg -> Msg after Timeout -> timeout end of - {requests, Req, Time} -> - State#recv_st{waiting = Req, waiting_since = Time}; - timeout -> - State#recv_st{waiting_since = undefined}; - close_down -> - throw(done) - end; -update_waiting(_Timeout, State) -> - State. - -%% -%% Send logic -%% - -send_loop(Transport, Socket, RecvPid, BatchSize) -> - case receive_data(BatchSize) of - {recv_exit, Reason} -> - {recv_exit, Reason}; - {data, {Refs, Data}} -> - Time = erlang:monotonic_time(millisecond), - case Transport:send(Socket, Data) of - ok -> - %% send to recv proc to fetch the response - RecvPid ! {requests, Refs, Time}, - send_loop(Transport, Socket, RecvPid, BatchSize); - {error, Reason} -> - %% Give recv_loop time to finish processing - %% This will shut down recv_loop if it is waiting on socket - Transport:shutdown(Socket, read_write), - %% This will shut down recv_loop if it is waiting for a reference - RecvPid ! close_down, - %% Ok, recv done, time to die - receive {recv_exit, _Reason} -> ok end, - {send_exit, Reason} - end - end. - -receive_data(N) -> - receive_data(N, infinity, []). - -receive_data(0, _Time, Acc) -> - {data, lists:unzip(lists:reverse(Acc))}; -receive_data(N, Time, Acc) -> - receive - {recv_exit, Reason} -> - {recv_exit, Reason}; - {send, Pid, Ref, Commands} -> - Data = ered_command:get_data(Commands), - Class = ered_command:get_response_class(Commands), - RefInfo = {Class, Pid, Ref, []}, - Acc1 = [{RefInfo, Data} | Acc], - receive_data(N - 1, 0, Acc1); - _Ignore -> - %% Mitigate OTP TLS 1.3 bug #10273 leaking a message {Ref, ok}. - receive_data(N, 0, Acc) - after Time -> - receive_data(0, 0, Acc) - end. diff --git a/src/ered_parser.erl b/src/ered_parser.erl index 629a5b5..11146a3 100644 --- a/src/ered_parser.erl +++ b/src/ered_parser.erl @@ -7,7 +7,8 @@ continue/2]). -export_type([parse_return/0, - parse_result/0 + parse_result/0, + state/0 ]). %%%=================================================================== @@ -25,10 +26,12 @@ -type parse_result() :: binary() | {error, binary()} | integer() | undefined | [parse_result()] | inf | neg_inf | nan | float() | true | false | #{parse_result() => parse_result()} | sets:set(parse_result()) | - {attribute, parse_result(), parse_result()} | {push | parse_result()}. + {attribute, parse_result(), parse_result()} | {push, parse_result()}. - --type parse_return() :: {done, parse_result(), #parser_state{}} | {need_more, bytes_needed(), #parser_state{}}. +-opaque state() :: #parser_state{}. +-type parse_return() :: {done, parse_result(), state()} | + {need_more, bytes_needed(), state()} | + {parse_error, any()}. -if(?OTP_RELEASE >= 24). -define(sets_new, sets:new([{version, 2}])). @@ -41,7 +44,7 @@ %%%=================================================================== %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec init() -> #parser_state{}. +-spec init() -> state(). %% %% Init empty parser continuation %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -49,7 +52,7 @@ init() -> #parser_state{}. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec next(#parser_state{}) -> parse_return(). +-spec next(state()) -> parse_return(). %% %% Get next result or continuation. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -57,7 +60,7 @@ next(State) -> parse(State). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec continue(binary(), #parser_state{}) -> parse_return(). +-spec continue(binary(), state()) -> parse_return(). %% %% Feed more data to the parser. Get next result or continuation. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -70,16 +73,21 @@ continue(NewData, State) -> %%%=================================================================== parse(State=#parser_state{data=Data, next=Fun, bytes_needed=Bytes}) -> - case token(Data, Bytes) of - {need_more, LackingBytes} -> - {need_more, LackingBytes, State}; - {Token, Rest} -> - case Fun(Token) of - {done, Result} -> - {done, Result, #parser_state{data=Rest}}; - {cont, NextFun, NextBytes} -> - parse(#parser_state{data=Rest, next=NextFun, bytes_needed=NextBytes}) - end + try + case token(Data, Bytes) of + {need_more, LackingBytes} -> + {need_more, LackingBytes, State}; + {Token, Rest} -> + case Fun(Token) of + {done, Result} -> + {done, Result, #parser_state{data=Rest}}; + {cont, NextFun, NextBytes} -> + parse(#parser_state{data=Rest, next=NextFun, bytes_needed=NextBytes}) + end + end + catch + {parse_error, _Reason} = ParseError -> + ParseError end. token(Data, 0) -> diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 51ceab6..6ff824b 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -11,11 +11,14 @@ run_test_() -> {spawn, fun server_close_socket_t/0}, {spawn, fun bad_request_t/0}, {spawn, fun server_buffer_full_t/0}, + {spawn, fun low_high_watermark_t/0}, {spawn, fun bad_option_t/0}, {spawn, fun bad_connection_option_t/0}, {spawn, fun server_buffer_full_reconnect_t/0}, {spawn, fun server_buffer_full_node_goes_down_t/0}, - {spawn, fun send_timeout_t/0}, + {spawn, fun response_timeout_t/0}, + {spawn, fun send_backoff_tcp_t/0}, + {spawn, fun send_backoff_tls_t/0}, {spawn, fun fail_hello_t/0}, {spawn, fun hello_with_auth_t/0}, {spawn, fun hello_with_auth_fail_t/0}, @@ -40,7 +43,8 @@ request_t() -> fail_connect_t() -> {ok,Pid} = ered_client:start_link("127.0.0.1", 0, [{info_pid, self()}]), - {connect_error,econnrefused} = expect_connection_down(Pid), + {connect_error, Reason} = expect_connection_down(Pid), + true = Reason =:= econnrefused orelse Reason =:= eaddrnotavail, %% make sure there are no more connection down messages timeout = receive M -> M after 500 -> timeout end. @@ -66,7 +70,7 @@ fail_parse_t() -> Pid ! ered_client:command(Client, [<<"ping">>]) end), expect_connection_up(Client), - Reason = {recv_exit, {parse_error,{invalid_data,<<"&pong">>}}}, + Reason = {parse_error, {invalid_data, <<"&pong">>}}, receive #{msg_type := socket_closed, reason := Reason} -> ok end, expect_connection_up(Client), {ok, <<"pong">>} = get_msg(). @@ -75,18 +79,22 @@ fail_parse_t() -> server_close_socket_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), - spawn_link(fun() -> - {ok, Sock} = gen_tcp:accept(ListenSock), - gen_tcp:close(Sock), - - %% resend from client - {ok, _Sock2} = gen_tcp:accept(ListenSock), - receive ok -> ok end - end), + ServerPid = + spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + receive continue -> ok end, + gen_tcp:close(Sock), + + %% resend from client + {ok, _Sock2} = gen_tcp:accept(ListenSock), + receive done -> ok end + end), Client = start_client(Port), expect_connection_up(Client), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, - expect_connection_up(Client). + ServerPid ! continue, + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, + expect_connection_up(Client), + ServerPid ! done. %% Suppress warning from command 'bad_request' @@ -113,7 +121,7 @@ server_buffer_full_t() -> Expected = iolist_to_binary(lists:duplicate(5, Ping)), {ok, Expected} = gen_tcp:recv(Sock, size(Expected)), %% should be nothing more since only 5 pending - {error, timeout} = gen_tcp:recv(Sock, 0, 0), + ?assertEqual({error, timeout}, gen_tcp:recv(Sock, 0, 0)), timer:sleep(500), @@ -133,11 +141,61 @@ server_buffer_full_t() -> Pid = self(), [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], receive #{msg_type := queue_full} -> ok end, - {6, {error, queue_overflow}} = get_msg(), + ?assertMatch({6, {error, queue_overflow}}, get_msg()), receive #{msg_type := queue_ok} -> ok end, - [{N, {ok, <<"pong">>}} = get_msg()|| N <- [1,2,3,4,5,7,8,9,10,11]], + [?assertMatch({N, {ok, <<"pong">>}}, get_msg()) || N <- [1,2,3,4,5,7,8,9,10,11]], no_more_msgs(). +low_high_watermark_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, Port} = inet:port(ListenSock), + ServerPid = spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + Ping = <<"*1\r\n$4\r\nping\r\n">>, + FivePing = iolist_to_binary(lists:duplicate(5, Ping)), + {ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)), + + %% should be nothing more since only 5 pending + ?assertEqual({error, timeout}, gen_tcp:recv(Sock, 0, 0)), + + gen_tcp:send(Sock, lists:duplicate(4, <<"+pong\r\n">>)), + receive send_one_more_pong -> ok end, + gen_tcp:send(Sock, lists:duplicate(1, <<"+pong\r\n">>)), + + {ok, FivePing} = gen_tcp:recv(Sock, 0), + gen_tcp:send(Sock, lists:duplicate(5, <<"+pong\r\n">>)), + receive ok -> ok end + end), + Client = start_client(Port, [{connection_opts, [{batch_size,5}]}, {max_waiting, 10}, {max_pending, 5}, {queue_ok_level,1}]), + expect_connection_up(Client), + + Pid = self(), + [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,10)], + [?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [1,2,3,4]], + + %% high water mark is hit, and we can not fill more until we reach low water-mark. + ?assertMatch(#{pending := {1, _}, + waiting := {5, _}, + filling_batch := false}, + ered_client:state_to_map(sys:get_state(Client))), + + ServerPid ! send_one_more_pong, + [?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [5]], + + %% low water mark reached, pending should now be filled. + ?assertMatch(#{pending := {5, _}, + waiting := {0, _}, + filling_batch := false}, + ered_client:state_to_map(sys:get_state(Client))), + + [?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [6,7,8,9,10]], + + ?assertMatch(#{pending := {0, _}, + waiting := {0, _}, + filling_batch := true}, + ered_client:state_to_map(sys:get_state(Client))), + + no_more_msgs(). server_buffer_full_reconnect_t() -> @@ -145,17 +203,16 @@ server_buffer_full_reconnect_t() -> {ok, Port} = inet:port(ListenSock), spawn_link(fun() -> {ok, Sock} = gen_tcp:accept(ListenSock), - %% expect 5 ping Ping = <<"*1\r\n$4\r\nping\r\n">>, - Expected = iolist_to_binary(lists:duplicate(5, Ping)), - {ok, Expected} = gen_tcp:recv(Sock, size(Expected)), + FivePing = iolist_to_binary(lists:duplicate(5, Ping)), + {ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)), %% should be nothing more since only 5 pending {error, timeout} = gen_tcp:recv(Sock, 0, 0), gen_tcp:close(Sock), {ok, Sock2} = gen_tcp:accept(ListenSock), - {ok, Expected} = gen_tcp:recv(Sock2, size(Expected)), + {ok, FivePing} = gen_tcp:recv(Sock2, size(FivePing)), gen_tcp:send(Sock2, lists:duplicate(5, <<"+pong\r\n">>)), %% should be nothing more since only 5 pending @@ -172,7 +229,7 @@ server_buffer_full_reconnect_t() -> receive #{msg_type := queue_full} -> ok end, %% 1 message over the limit, first one in queue gets kicked out {6, {error, queue_overflow}} = get_msg(), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, %% when connection goes down the pending messages will be put in the queue and the queue %% will overflow kicking out the oldest first [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], @@ -187,10 +244,9 @@ server_buffer_full_node_goes_down_t() -> {ok, Port} = inet:port(ListenSock), spawn_link(fun() -> {ok, Sock} = gen_tcp:accept(ListenSock), - %% expect 5 ping Ping = <<"*1\r\n$4\r\nping\r\n">>, - Expected = iolist_to_binary(lists:duplicate(5, Ping)), - {ok, Expected} = gen_tcp:recv(Sock, size(Expected)), + FivePing = iolist_to_binary(lists:duplicate(5, Ping)), + {ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)), %% should be nothing more since only 5 pending {error, timeout} = gen_tcp:recv(Sock, 0, 0), gen_tcp:close(ListenSock) @@ -201,9 +257,9 @@ server_buffer_full_node_goes_down_t() -> Pid = self(), [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], receive #{msg_type := queue_full} -> ok end, - {6, {error, queue_overflow}} = get_msg(), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, - [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], + ?assertEqual({6, {error, queue_overflow}}, get_msg()), + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, + [?assertEqual({N, {error, queue_overflow}}, get_msg()) || N <- [1,2,3,4,5]], receive #{msg_type := queue_ok} -> ok end, receive #{msg_type := connect_error, reason := econnrefused} -> ok end, receive #{msg_type := node_down_timeout} -> ok end, @@ -224,7 +280,7 @@ bad_connection_option_t() -> [{info_pid, self()}, {connection_opts, [bad_option]}])). -send_timeout_t() -> +response_timeout_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), spawn_link(fun() -> @@ -242,11 +298,73 @@ send_timeout_t() -> Pid = self(), ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {reply, Reply} end), %% this should come after max 1000ms - receive #{msg_type := socket_closed, reason := {recv_exit, timeout}} -> ok after 2000 -> timeout_error() end, + receive #{msg_type := socket_closed, reason := timeout} -> ok after 2000 -> timeout_error() end, expect_connection_up(Client), {reply, {ok, <<"pong">>}} = get_msg(), no_more_msgs(). +send_backoff_tcp_t() -> + send_backoff_t(gen_tcp). + +send_backoff_tls_t() -> + %% ssl with {send_timeout, 0} closes the socket on timeout in + %% OTP 28.0 .. 28.4 (ssl 11.2.0 .. 11.5.2). It was intentionally + %% broken and fixed in ssl-11.5.3 (OTP-20018). + %% It works in OTP 27 and earlier. + application:load(ssl), + {ok, SslVsn} = application:get_key(ssl, vsn), + case vsn_ge(SslVsn, "11.2.0") andalso not vsn_ge(SslVsn, "11.5.3") of + true -> ok; %% skip + false -> send_backoff_t(ssl) + end. + +send_backoff_t(Transport) -> + %% Send a large command N times. + N = 10, + + %% Construct a large binary. + Size = 1000 * 1000, + LargeBinary = binary:copy(<<"a">>, Size), + LargeCommand = [<<"SET">>, <<"foo">>, LargeBinary], + Resp = ered_command:get_data( + ered_command:convert_to([<<"SET">>, <<"foo">>, LargeBinary])), + RespLen = byte_size(Resp), + + %% Start server + {ListenSock, Port, ConnOpts} = listen(Transport), + ServerPid = + spawn_link(fun() -> + Sock = accept(Transport, ListenSock), + receive continue -> ok end, + [begin + {ok, <<"*3\r\n$3\r\nSET\r", _/binary>>} = Transport:recv(Sock, RespLen), + ok = Transport:send(Sock, <<"+OK\r\n">>) + end || _ <- lists:seq(1, N)], + {ok, <<"*1\r\n$4\r\nping\r\n">>} = Transport:recv(Sock, 0), + ok = Transport:send(Sock, <<"+PONG\r\n">>), + receive ok -> ok end + end), + Client = start_client(Port, [{connection_opts, ConnOpts ++ [{batch_size, 1}]}]), + expect_connection_up(Client), + Pid = self(), + %% Send the large command N times. In some cases, gen_tcp:send returns + %% {error, timeout} after a few times, even if the data is really large. + [ered_client:command_async(Client, LargeCommand, fun(Reply) -> Pid ! {reply, Reply} end) + || _ <- lists:seq(1, N)], + + #{backoff_send := BackoffSend, + pending := {NumPending, _}, + waiting := {NumWaiting, _}} = ered_client:state_to_map(sys:get_state(Client)), + ?assert(BackoffSend), + ?assert(NumPending > 0), + ?assert(NumWaiting > 0), + ?assertEqual(N, NumWaiting + NumPending), + ServerPid ! continue, + [{reply, {ok, <<"OK">>}} = get_msg() || _ <- lists:seq(1, N)], + ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {reply, Reply} end), + {reply, {ok, <<"PONG">>}} = get_msg(), + no_more_msgs(). + fail_hello_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), @@ -256,15 +374,12 @@ fail_hello_t() -> {ok, <<"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), ok = gen_tcp:send(Sock, <<"-NOPROTO unsupported protocol version\r\n">>), - %% test resend - {ok, <<"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, <<"-NOPROTO unsupported protocol version\r\n">>), - - Pid ! done + Pid ! done, + {error, closed} = gen_tcp:recv(Sock, 0) end), {ok,Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}]), - {init_error, [<<"NOPROTO unsupported protocol version">>]} = expect_connection_down(Client), receive done -> ok end, + {init_error, [<<"NOPROTO unsupported protocol version">>]} = expect_connection_down(Client), no_more_msgs(). hello_with_auth_t() -> @@ -294,11 +409,13 @@ hello_with_auth_t() -> "$6\r\nmaster\r\n" "$7\r\nmodules\r\n" "*0\r\n">>), - Pid ! done + Pid ! done, + receive ok -> ok end end), - {ok, _Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, - {auth, {<<"ali">>, <<"sesame">>}}]), + {ok, Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, + {auth, {<<"ali">>, <<"sesame">>}}]), receive done -> ok end, + expect_connection_up(Client), no_more_msgs(). hello_with_auth_fail_t() -> @@ -333,12 +450,14 @@ auth_t() -> "$6\r\nsesame\r\n">>} = gen_tcp:recv(Sock, 0), ok = gen_tcp:send(Sock, <<"+OK\r\n">>), - Pid ! done + Pid ! done, + receive ok -> ok end end), - {ok, _Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, - {resp_version, 2}, - {auth, {<<"ali">>, <<"sesame">>}}]), + {ok, Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, + {resp_version, 2}, + {auth, {<<"ali">>, <<"sesame">>}}]), receive done -> ok end, + expect_connection_up(Client), no_more_msgs(). auth_fail_t() -> @@ -414,3 +533,30 @@ start_client(Port, Opt) -> timeout_error() -> error({timeout, erlang:process_info(self(), messages)}). +listen(gen_tcp) -> + {ok, LSock} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, Port} = inet:port(LSock), + {LSock, Port, [{tcp_options, []}]}; +listen(ssl) -> + ssl:start(), + CertFile = "/tmp/ered_test_cert.pem", + KeyFile = "/tmp/ered_test_key.pem", + os:cmd("openssl req -x509 -newkey rsa:2048 -keyout " ++ KeyFile ++ + " -out " ++ CertFile ++ " -days 1 -nodes -subj '/CN=localhost' 2>/dev/null"), + {ok, LSock} = ssl:listen(0, [binary, {active, false}, + {certfile, CertFile}, {keyfile, KeyFile}]), + {ok, {_, Port}} = ssl:sockname(LSock), + {LSock, Port, [{tls_options, [{verify, verify_none}]}]}. + +accept(gen_tcp, LSock) -> + {ok, Sock} = gen_tcp:accept(LSock), + Sock; +accept(ssl, LSock) -> + {ok, TSock} = ssl:transport_accept(LSock), + {ok, Sock} = ssl:handshake(TSock), + Sock. + + +vsn_ge(Vsn1, Vsn2) -> + lists:map(fun list_to_integer/1, string:tokens(Vsn1, ".")) >= + lists:map(fun list_to_integer/1, string:tokens(Vsn2, ".")). diff --git a/test/ered_cluster_SUITE.erl b/test/ered_cluster_SUITE.erl index 66666a7..0c4f966 100644 --- a/test/ered_cluster_SUITE.erl +++ b/test/ered_cluster_SUITE.erl @@ -272,7 +272,7 @@ t_hard_failover(_) -> ct:pal("~p\n", [ered_cluster:command_all(R, [<<"CLUSTER">>, <<"SLOTS">>])]), ct:pal(os:cmd("docker stop " ++ Pod)), - ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := {recv_exit, closed}}), + ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := tcp_closed}), ?MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}), ?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}, 2500), @@ -363,7 +363,7 @@ t_manual_failover_then_old_master_down(_) -> ?MSG(#{addr := {"127.0.0.1", Port}, master := true, msg_type := socket_closed, - reason := {recv_exit, closed}}), + reason := tcp_closed}), %% Ered prefers the replica of the disconnected node for slot map update, %% since it is likely to know about a failover first; it is the new master. @@ -420,7 +420,7 @@ t_blackhole(_) -> ered:command_async(ClientRef, [<<"PING">>], fun(Reply) -> TestPid ! {ping_reply, Reply} end), - ?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, master := true}, + ?MSG(#{msg_type := socket_closed, reason := timeout, master := true}, ResponseTimeout + 1000), ?MSG({ping_reply, {error, _Reason}}, % node_down or node_deactivated NodeDownTimeout + 1000), @@ -432,6 +432,8 @@ t_blackhole(_) -> ?OPTIONAL_MSG(#{msg_type := cluster_ok}), ?MSG(#{msg_type := client_stopped, reason := shutdown, master := false}, CloseWait + 1000), + ?OPTIONAL_MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}), + no_more_msgs(), ct:pal("Unpausing container: " ++ os:cmd("docker unpause " ++ Pod)), timer:sleep(500), @@ -477,7 +479,7 @@ t_blackhole_all_nodes(_) -> fun(Reply) -> TestPid ! {ping_reply, Reply} end) end, AddrToClient), - [?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, addr := {"127.0.0.1", Port}}, + [?MSG(#{msg_type := socket_closed, reason := timeout, addr := {"127.0.0.1", Port}}, ResponseTimeout + 1000) || Port <- ?PORTS], ?MSG({ping_reply, {error, _Reason1}}, NodeDownTimeout + 1000), ?MSG({ping_reply, {error, _Reason2}}, NodeDownTimeout + 1000), @@ -526,11 +528,14 @@ t_init_timeout(_) -> ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT PAUSE 10000")]), {ok, _P} = ered_cluster:connect([{localhost, 30001}], [{info_pid, [self()]}] ++ Opts), - ?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}}, 3500), + ?MSG(#{msg_type := socket_closed, reason := timeout}, 3500), ?MSG(#{msg_type := node_down_timeout, addr := {localhost, 30001}}, 2500), %% Does not work on Redis before 6.2.0. ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT UNPAUSE")]), + %% Maybe we were waiting for init commands when the node down timeout fired. + ?OPTIONAL_MSG(#{msg_type := init_error, reason := node_down}), + ?MSG(#{msg_type := connected, addr := {localhost, 30001}}), ?MSG(#{msg_type := slot_map_updated}), @@ -760,8 +765,8 @@ t_kill_client(_) -> ct:pal("~p\n",[os:cmd("redis-cli -p " ++ integer_to_list(Port) ++ " CLIENT KILL TYPE NORMAL")]), ?MSG(#{msg_type := socket_closed, addr := {_, Port}}), - %% connection reestablished - ?MSG(#{msg_type := connected, addr := {_, Port}}), + %% Waits until 1000ms after the first connect before reconnecting. + ?MSG(#{msg_type := connected, addr := {_, Port}}, 2000), no_more_msgs(). t_new_cluster_master(_) -> diff --git a/test/ered_cluster_tls_SUITE.erl b/test/ered_cluster_tls_SUITE.erl index 475aa5e..e0ab5e1 100644 --- a/test/ered_cluster_tls_SUITE.erl +++ b/test/ered_cluster_tls_SUITE.erl @@ -190,8 +190,10 @@ t_expired_cert_tls_1_3(_) -> [{info_pid, [self()]}, {client_opts, ClientOpts}]), ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", 31001}, - reason := {recv_exit, - {tls_alert, - {certificate_expired, _}}}}), + reason := {tls_alert, {certificate_expired, _}}}), ?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", 31001}}, 2500), + timer:sleep(10), % Wait for the optional messages. + ?OPTIONAL_MSG(#{msg_type := init_error, reason := node_down}), + ?OPTIONAL_MSG(#{msg_type := socket_closed, + reason := {tls_alert, {certificate_expired, _}}}), no_more_msgs(). diff --git a/test/ered_connection_tests.erl b/test/ered_connection_tests.erl deleted file mode 100644 index 1505c95..0000000 --- a/test/ered_connection_tests.erl +++ /dev/null @@ -1,86 +0,0 @@ --module(ered_connection_tests). - --include_lib("eunit/include/eunit.hrl"). - -split_data_test() -> - Data = << <<"A">> || _ <- lists:seq(1, 3000) >>, - {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), - {ok, Port} = inet:port(ListenSock), - spawn_link(fun() -> - {ok, Sock} = gen_tcp:accept(ListenSock), - {ok, <<"*2\r\n$5\r\nhello\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), - HelloReply = <<"%7\r\n", - "$6\r\nserver\r\n", "$6\r\nvalkey\r\n", - "$7\r\nversion\r\n", "$5\r\n9.0.0\r\n", - "$5\r\nproto\r\n", ":3\r\n" - "$2\r\nid\r\n", ":2\r\n", - "$4\r\nmode\r\n", "$10\r\nstandalone\r\n" - "$4\r\nrole\r\n", "$6\r\nmaster\r\n" - "$7\r\nmodules\r\n", "*0\r\n">>, - ok = gen_tcp:send(Sock, HelloReply), - SetCommand = <<"*3\r\n$3\r\nset\r\n$4\r\nkey1\r\n$3000\r\n", Data/binary, "\r\n">>, - {ok, SetCommand} = gen_tcp:recv(Sock, size(SetCommand)), - ok = gen_tcp:send(Sock, <<"+OK\r\n">>), - {ok, <<"*2\r\n$3\r\nget\r\n$4\r\nkey1\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, <<"$3000\r\n", Data/binary, "\r\n">>), - {error, closed} = gen_tcp:recv(Sock, 0), - %% ok = gen_tcp:shutdown(Sock, write), - ok - end), - {ok, Conn1} = ered_connection:connect("127.0.0.1", Port), - ered_connection:command(Conn1, [<<"hello">>, <<"3">>]), - <<"OK">> = ered_connection:command(Conn1, [<<"set">>, <<"key1">>, Data]), - Data = ered_connection:command(Conn1, [<<"get">>, <<"key1">>]). - -%% Suppress warnings due to expected failures from MalformedCommand. --dialyzer({[no_fail_call, no_return], trailing_reply_test/0}). -trailing_reply_test() -> - Pid = self(), - %% 277124 byte nested array, it takes a non-trivial time to parse - BigNastyData = iolist_to_binary(nested_list(8)), - ?debugFmt("~w", [size(BigNastyData)]), - - spawn_link(fun() -> - {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), - {ok, Port} = inet:port(ListenSock), - Pid ! {port, Port}, - {ok, Sock} = gen_tcp:accept(ListenSock), - {ok, <<"*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, BigNastyData), - ok = gen_tcp:shutdown(Sock, write), - Pid ! sent_big_nasty, - receive ok -> ok end - end), - {port, Port} = receive_msg(), - %% increase receive buffer to fit the whole nasty data package - {ok, Conn1} = ered_connection:connect("127.0.0.1", Port, [{batch_size, 1}, - {tcp_options, [{recbuf, 524288}]}]), - ?debugFmt("~w", [Conn1]), - ered_connection:command_async(Conn1, [<<"ping">>], ping1), - receive sent_big_nasty -> ok end, - MalformedCommand = {redis_command, {pipeline, [normal]}, undefined}, - ered_connection:command_async(Conn1, MalformedCommand, no_ref), - - %% make sure the ping is received before the connection is shut down - - ?debugMsg("waiting for ping"), - - receive {ping1, _} -> ok after 2000 -> exit(waiting_for_ping) end, - ?debugMsg("got ping"), - {socket_closed, Conn1, {send_exit, einval}} = receive Msg -> Msg end, - ensure_empty(). - - -receive_msg() -> - receive Msg -> Msg end. - -%% This function is used from trailing_reply_test() --dialyzer({no_unused, ensure_empty/0}). -ensure_empty() -> - empty = receive Msg -> Msg after 0 -> empty end. - - -nested_list(1) -> - <<"+A\r\n">>; -nested_list(N) -> - ["*", integer_to_list(N), "\r\n", [nested_list(N-1) || _ <- lists:seq(1, N)]]. diff --git a/test/ered_parser_tests.erl b/test/ered_parser_tests.erl index 3b063b9..2dc318e 100644 --- a/test/ered_parser_tests.erl +++ b/test/ered_parser_tests.erl @@ -85,12 +85,11 @@ parse_fail_test_() -> decode_err(In, Expected) -> fun() -> - try - A = ered_parser:continue(In, ered_parser:init()), - exit({unexpected_success, A}) - catch - throw:{parse_error, Err} -> - ?assertEqual(Expected, Err) + case ered_parser:continue(In, ered_parser:init()) of + {parse_error, Err} -> + ?assertEqual(Expected, Err); + A -> + exit({unexpected_success, A}) end end.