diff --git a/.gitignore b/.gitignore index eafb405..4949a88 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ log/ erlang.mk ekka.d _build/ +_anvl_build/ .rebar3/ rebar.lock TAGS diff --git a/anvl.erl b/anvl.erl index 772d770..daf16b5 100644 --- a/anvl.erl +++ b/anvl.erl @@ -21,7 +21,7 @@ conf() -> , #{id => optvar, repo => "https://github.com/emqx/optvar", ref => {tag,"1.0.5"}} , #{id => snabbkaffe, repo => "https://github.com/kafka4beam/snabbkaffe", ref => {tag,"1.0.10"}} , #{id => gproc, repo => "https://github.com/uwiger/gproc", ref => {tag,"1.1.0"}} - , #{id => familiar, repo => "https://github.com/ieQu1/familiar", ref => {tag,"0.1.1"}} + , #{id => familiar, repo => "https://github.com/ieQu1/familiar", ref => {tag,"0.1.2"}} ] , hex_pm => [ #{id => proper, version => "1.5.0"} diff --git a/doc/classy.texi b/doc/classy.texi index 5d3265c..47728f3 100644 --- a/doc/classy.texi +++ b/doc/classy.texi @@ -292,6 +292,13 @@ The document was typeset with @emph{Unit}: ms Default timeout for remote procedure calls. + + @defen{hook_timeout, @code{timeout()}, 30_000} + + @emph{Unit}: ms + + Maximum run time for any hook. + @section Peer @defen{sync_timeout, @code{non_neg_integer()}, 1000} @@ -306,6 +313,16 @@ The document was typeset with Minimum number of known peers necessary to advance @ref{t:classy:run_level/0,run level} from @code{single} to @code{cluster}. + @defen{to_cluster_sets, @code{[classy:node_set_name()]}, @code{[]}} + + List of node set names used to additionally restrict when the node is considered joined to a cluster. + Sets from the lists are combined using set intersection operation. + + @defen{quorum_sets, @code{[classy:node_set_name()]}, @code{[]}} + + List of node set names used to additionally restrict when the node is considered having a quorum. + Sets from the lists are combined using set intersection operation. + @defen{quorum, @code{pos_integer()}, 1} Minimum number of known peers necessary to advance @ref{t:classy:run_level/0,run level} @@ -328,7 +345,7 @@ The document was typeset with Note: to prevent network-isolated nodes from kicking their peers, quorum among the running nodes is required to perform the act. - @defen{cleanup_check_interval, @code{pos_integer()}, 30_000} + @defen{cleanup_check_interval, @code{pos_integer()}, 5_000} @emph{Unit}: ms. diff --git a/rebar.config b/rebar.config index 0640ad5..6567b0e 100644 --- a/rebar.config +++ b/rebar.config @@ -12,7 +12,7 @@ , {hackney, {git, "https://github.com/emqx/hackney.git", {tag, "1.18.1-1"}}} %% Testing: , {meck, "1.0.0"} - , {familiar, {git, "https://github.com/ieQu1/familiar.git", {tag, "0.1.1"}}} + , {familiar, {git, "https://github.com/ieQu1/familiar", {tag, "0.1.2"}}} ]}. {dialyzer, diff --git a/src/classy.erl b/src/classy.erl index 38800ec..2843a82 100644 --- a/src/classy.erl +++ b/src/classy.erl @@ -16,15 +16,21 @@ This MFA can contain calls to various @code{classy:on_...} functions. -export([ info/0 , info/1 , info/2 + , n_restarts/0 + , n_restarts/1 , node_of_site/2 , join_node/2 , kick_site/2 , kick_node/2 , sites/0 + , sites/1 , nodes/1 , quorum/1 , fault_tolerance/1 , at_lower_level/2 + , the_site/0 + , the_cluster/0 + , node_sets/0 ]). -export([ on_node_init/2 @@ -32,6 +38,10 @@ This MFA can contain calls to various @code{classy:on_...} functions. , on_create_site/2 , on_peer_connection_change/2 , on_membership_change/2 + , on_peer_liveness_change/2 + , on_peer_node_change/2 + , on_peer_restart/2 + , on_node_classify/2 , pre_join/2 , post_join/2 , pre_kick/2 @@ -53,7 +63,9 @@ This MFA can contain calls to various @code{classy:on_...} functions. , cluster_info/0 , run_level/0 - , membership_change_hook/0 + + , node_set_name/0 + , node_set/0 ]). -include("classy_internal.hrl"). @@ -98,8 +110,6 @@ Unique random persistent identifier of the site. , bad_nodes := #{node() => _} }. --type membership_change_hook() :: fun((cluster_id(), _Local :: site(), _Remote :: site(), _IsMember :: boolean()) -> _). - -doc """ Join intent is an arbitrary term passed to @ref{classy:pre_join/2} and @ref{classy:post_join/2} hooks. @code{pre_join} may match on intent to prevent join in certain cases, @@ -136,7 +146,33 @@ Site is kicked by the autoclean logic. -doc """ @xref{Run level} """. --type run_level() :: stopped | single | cluster | quorum. +-type run_level() :: ?stopped | ?single | ?cluster | ?quorum. + +-doc """ +An arbitrary ID of a node set. + +Predefined sets are: +@table @code +@item all +names of all previously seen nodes that belong +(or belonged, if the site is currently down) +to the cluster members. +@item up + +@item down +@item connected +there's an Erlang distribution connection to the node hosting the site. +@item disconnected +there's no Erlang distribution connection to the node hosting the site, +but site is not considered down. +@end table +""". +-type node_set_name() :: all | up | down | connected | disconnected | term(). + +-doc """ +A set of nodes. +""". +-type node_set() :: ordsets:ordset(node()). %%================================================================================ %% API functions @@ -149,14 +185,8 @@ Provide general information about the local node. -spec info() -> info(). info() -> %% Note: this is an RPC target. - case classy_node:the_cluster() of - {ok, MaybeCluster} -> ok; - _ -> MaybeCluster = undefined - end, - case classy_node:the_site() of - {ok, MaybeSite} -> ok; - _ -> MaybeSite = undefined - end, + MaybeCluster = classy_node:maybe_cluster(), + MaybeSite = classy_node:maybe_site(), PeerInfo0 = classy_node:peer_info(), case maps:take(MaybeSite, PeerInfo0) of {#{last_update := MyLU}, PeerInfo} -> @@ -169,6 +199,7 @@ info() -> , site => MaybeSite , last_update => MyLU , peers => PeerInfo + , n_restarts => n_restarts() }, classy_hook:fold(?on_enrich_site_info, [], Acc). @@ -215,6 +246,27 @@ info(_Hops, Nodes) -> , bad_nodes => BadNodes }. +-doc """ +Return the total number of times the site has been restarted. +""". +-spec n_restarts() -> non_neg_integer() | undefined. +n_restarts() -> + case classy_liveness:n_restarts() of + {ok, N} -> N; + _ -> undefined + end. + +-doc """ +Get cached value of the number of restarts of a remote site. + +Note: for the local site, +please call @ref{classy:n_restarts/0}, +as values returned by this function may be out-of-date. +""". +-spec n_restarts(site()) -> {ok, non_neg_integer()} | undefined. +n_restarts(Site) -> + classy_node:n_restarts(Site). + -doc """ Locate a node that is currently hosting a site. @@ -281,7 +333,7 @@ Translate node name to a site ID and kick it via @ref{classy:kick_site/2}. """. -spec kick_node(node(), kick_intent()) -> ok | {error, _}. kick_node(Node, Intent) -> - case {classy_node:the_cluster(), classy_node:the_site()} of + case {the_cluster(), the_site()} of {{ok, Cluster}, {ok, Local}} -> case classy_membership:site_of_node(Cluster, Local) of #{Node := Site} -> @@ -299,8 +351,8 @@ List IDs of peer sites. -spec sites() -> [site()]. sites() -> maybe - {ok, Cluster} ?= classy_node:the_cluster(), - {ok, Local} ?= classy_node:the_site(), + {ok, Cluster} ?= the_cluster(), + {ok, Local} ?= the_site(), classy_membership:members(Cluster, Local) else _ -> @@ -308,28 +360,77 @@ sites() -> end. -doc """ -List all peer nodes. +Get contents of a site set. + +Note: argument is the same as for node sets. +""". +-spec sites(node_set_name()) -> [site()]. +sites(SetName) -> + case persistent_term:get(?pt_site_sets, #{}) of + #{SetName := Set} -> Set; + #{} -> [] + end. + +-doc """ +List peer nodes that belong to a node set. Important to note: this function returns node names of @emph{peer sites}. Random connected nodes, such as shells or classy sites that are not member of the current cluster, are excluded. """. --spec nodes(all | connected | disconnected) -> [node()]. -nodes(Query) -> - classy_node:nodes(Query). +-spec nodes(node_set_name()) -> [node()]. +nodes(Name) -> + case node_sets() of + #{Name := V} -> V; + #{} -> [] + end. + +-doc """ +Return a map of all node sets. +""". +-spec node_sets() -> #{node_set_name() => node_set()}. +node_sets() -> + persistent_term:get(?pt_node_sets, #{}). -doc """ -Lower the run level to the given value and run the specified function. +This function can be used to +lower the run level of the system to the given value +and run the specified function. This function can be used to implement migrations that require business applications to be stopped. + +Note: this function returns immediately after scheduling the action, +but before the function is executed. """. --spec at_lower_level(classy_node:run_level_atom(), fun(() -> Ret)) -> - {ok, Ret} | - {error | exit | throw, _Reason, _Stacktrace}. +-spec at_lower_level(run_level(), fun(() -> any())) -> ok | {error, _}. at_lower_level(RunLevel, Fun) -> - classy_node:at_lower_level(RunLevel, Fun). + classy_rl_changer:at_lower_level(RunLevel, Fun). + +-doc """ +Get ID of the local site. +""". +-spec the_site() -> {ok, site()} | undefined. +the_site() -> + case classy_node:maybe_site() of + Site when is_binary(Site) -> + {ok, Site}; + undefined -> + undefined + end. + +-doc """ +Get ID of the cluster. +""". +-spec the_cluster() -> {ok, cluster_id()} | undefined. +the_cluster() -> + case classy_node:maybe_cluster() of + Cluster when is_binary(Cluster) -> + {ok, Cluster}; + undefined -> + undefined + end. %%-------------------------------------------------------------------------------- %% Misc. @@ -372,7 +473,7 @@ fault_tolerance(N) -> -doc """ Register a hook that is executed when the node (not the site) starts. -It is called before @ref{classy_node:the_site/0} and @code{classy_node:the_cluster/0} +It is called before @ref{classy:the_site/0} and @code{classy:the_cluster/0} are initialized, and can be used to override the default cluster and site initialization logic. """. @@ -407,19 +508,81 @@ WARNING: status change to @code{false} is not indicative of the remote site bein This can happen during a network partition. """. -spec on_peer_connection_change(Fun, classy_hook:prio()) -> classy_hook:hook() - when Fun :: fun((cluster_id(), Local, Remote, node(), _IsConnected :: boolean()) -> _), - Local :: site(), + when Fun :: fun((Remote, node(), _IsConnected :: boolean()) -> _), Remote :: site(). on_peer_connection_change(Hook, Prio) -> classy_hook:insert(?on_peer_connection_status_change, Hook, Prio). -doc """ Register a hook that is executed when a site joins or leaves a cluster. + +@anchor {node_hook_execution} +Note: this hook can be executed multiple times if the local node is abruptly stopped while the hooks are running. +If the remote site re-joins the cluster while the local was down, +the hook may or may not run. """. --spec on_membership_change(membership_change_hook(), classy_hook:prio()) -> classy_hook:hook(). +-spec on_membership_change( + fun((cluster_id(), _Local :: site(), _Remote :: site(), _IsMember :: boolean()) -> _), + classy_hook:prio() + ) -> classy_hook:hook(). on_membership_change(Hook, Prio) -> classy_hook:insert(?on_membership_change, Hook, Prio). +-doc """ +Register a hook that is executed when a site changes status for up to down or vice versa. + +Note: this hook is different from @ref{classy:on_peer_connection_change/2}, +as care is taken to avoid firing it during a network partition. + +The decision to consider a peer down comes either from the peer itself when it shuts down gracefully +or from the quorum of other running peers. + +@xref {node_hook_execution}. +""". +-spec on_peer_liveness_change( + fun((_Remote :: site(), _IsAlive :: boolean()) -> _), + classy_hook:prio() + ) -> classy_hook:hook(). +on_peer_liveness_change(Hook, Prio) -> + classy_hook:insert(?on_peer_liveness_change, Hook, Prio). + +-doc """ +Register a hook that is executed when a peer site changes the Erlang node name. + +@xref {node_hook_execution}. +""". +-spec on_peer_node_change( + fun((_Remote :: site(), _OldNode :: node(), _NewNode :: node()) -> _), + classy_hook:prio() + ) -> classy_hook:hook(). +on_peer_node_change(Hook, Prio) -> + classy_hook:insert(?on_peer_node_change, Hook, Prio). + +-doc """ +Register a hook that is executed when a peer restarts. + +@xref {node_hook_execution}. +""". +-spec on_peer_restart( + fun((_Remote :: site(), _NRestarts :: pos_integer()) -> _), + classy_hook:prio() + ) -> classy_hook:hook(). +on_peer_restart(Hook, Prio) -> + classy_hook:insert(?on_peer_restart, Hook, Prio). + +-doc """ +Register a hook that can place a site's node into an arbitrary number of custom node sets, +based on @ref{t:classy:info/0}. + +@xref{classy:enrich_site_info/2}, @xref{classy:node_sets/0}. +""". +-spec on_node_classify( + fun((map()) -> [node_set()]), + classy_hook:prio() + ) -> classy_hook:hook(). +on_node_classify(Hook, Prio) -> + classy_hook:insert(?on_node_classify, Hook, Prio). + -doc """ Register a hook that is executed before the local node joins a different cluster. @@ -441,7 +604,7 @@ It is guaranteed to be called @emph{at least} once, and must be idempotent. """. -spec post_join( - fun((cluster_id(), Local, JoinedTo) -> _), + fun((cluster_id(), Local, JoinedTo, join_intent()) -> _), classy_hook:prio() ) -> classy_hook:hook() when Local :: site(), diff --git a/src/classy_autoclean.erl b/src/classy_autoclean.erl deleted file mode 100644 index 01f0395..0000000 --- a/src/classy_autoclean.erl +++ /dev/null @@ -1,171 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2026 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - -%% @doc A process that automatically kicks sites that have been absent for a long time. -%% -%% Relevant configurations are `max_site_downtime' and `quorum'. -%% -%% == Network partitions == -%% -%% In a partitioned network there is a risk that sites try to kick each other. -%% -%% Autoclean requires quorum of running nodes before making the decision to kick. -%% Note: as `quorum(running)' is always >= `quorum(config)', -%% even in a partition containing single node, -%% autoclean won't activate if `quorum' config is set to a value > 1. --module(classy_autoclean). - --behavior(gen_server). - -%% API: --export([start_link/0]). - -%% behavior callbacks: --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). - -%% internal exports: --export([site_down_since/2]). - --export_type([]). - --include("classy_internal.hrl"). --include_lib("snabbkaffe/include/trace.hrl"). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --record(to_check, {}). - -%%================================================================================ -%% API functions -%%================================================================================ - --define(SERVER, ?MODULE). - --spec start_link() -> {ok, pid()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%================================================================================ -%% behavior callbacks -%%================================================================================ - --record(s, - { t :: classy_lib:wakeup_timer() - }). - -init(_) -> - process_flag(trap_exit, true), - S = #s{}, - {ok, wakeup(S)}. - -handle_call(_Call, _From, S) -> - {reply, {error, unknown_call}, S}. - -handle_cast(_Cast, S) -> - {noreply, S}. - -handle_info(#to_check{}, S0) -> - S = S0#s{t = undefined}, - check_down_sites(), - {noreply, wakeup(S)}; -handle_info({'EXIT', _, shutdown}, S) -> - {stop, shutdown, S}; -handle_info(_Info, S) -> - {noreply, S}. - -terminate(_Reason, _S) -> - ok. - -%%================================================================================ -%% Internal exports -%%================================================================================ - -%% @doc RPC target. --spec site_down_since(classy_lib:unix_time_s(), classy:site()) -> classy_lib:unix_time_s() | alive. -site_down_since(RemoteT, Site) -> - case classy_table:lookup(?site_info, Site) of - [#site_info{isconn = true}] -> - alive; - [#site_info{isconn = false, last_update = DownSince}] -> - classy_lib:adjust_time_s_skew(RemoteT, DownSince); - [] -> - %% We have never seen the site alive: - 0 - end. - -%%================================================================================ -%% Internal functions -%%================================================================================ - -check_down_sites() -> - maybe - {ok, Cluster} ?= classy_node:the_cluster(), - {ok, Local} ?= classy_node:the_site(), - %% Calculate minimum wall time when site should be alive: - MaxDownSecs = max_downtime(), - true ?= is_integer(MaxDownSecs), - MinLastUpTime = classy_lib:time_s() - MaxDownSecs, - lists:foreach( - fun(Site) -> - maybe - true ?= Site =/= Local, - %% Before asking the remote sites, check the local data first: - [ #site_info{ node = Node - , isconn = false - , last_update = LastUpdate - } - ] ?= classy_table:lookup(?site_info, Site), - true ?= LastUpdate < MinLastUpTime, - %% Now check the quorum: - {ok, DownSince} ?= last_alive_at(Site), - true ?= is_integer(DownSince), - true ?= DownSince < MinLastUpTime, - %% Run hooks: - ok ?= classy_hook:all(?on_pre_autoclean, [Site]), - %% Now we're pretty certain that the site is really down: - ?tp(notice, automatically_kick_down_site, - #{ site => Site - , node => Node - , last_alive_at => LastUpdate - }), - classy_node:kick_site(Site, autoclean) - end - end, - classy_membership:members(Cluster, Local)), - classy_membership:cleanup(Cluster, Local, forget_after()) - end, - ok. - --spec last_alive_at(classy:site()) -> {ok, classy_lib:unix_time_s() | alive} | {error, no_quorum}. -last_alive_at(Site) -> - Ret = erpc:multicall( - classy:nodes(connected), - ?MODULE, site_down_since, [classy_lib:time_s(), Site], - classy_lib:rpc_timeout()), - Results = [I || {ok, I} <- Ret], - case length(Results) >= classy:quorum(running) of - true -> - {ok, lists:max(Results)}; - false -> - {error, no_quorum} - end. - --spec wakeup(#s{}) -> #s{}. -wakeup(S = #s{t = T0}) -> - T = classy_lib:wakeup_after(#to_check{}, check_interval(), T0), - S#s{t = T}. - --spec max_downtime() -> pos_integer() | infinity. -max_downtime() -> - application:get_env(classy, max_site_downtime, infinity). - --spec check_interval() -> pos_integer(). -check_interval() -> - application:get_env(classy, cleanup_check_interval, 30_000). - --spec forget_after() -> pos_integer(). -forget_after() -> - application:get_env(classy, forget_after, 7 * 24 * 60 * 60). diff --git a/src/classy_autocluster.erl b/src/classy_autocluster.erl index 9069d78..5bfd547 100644 --- a/src/classy_autocluster.erl +++ b/src/classy_autocluster.erl @@ -14,6 +14,7 @@ A server responsible for automatic peer discovery. , enable/0 , disable/0 , app_name/0 + , candidates/0 ]). %% behavior callbacks: @@ -62,6 +63,16 @@ app_name() -> [Name | _] = string:tokens(atom_to_list(node()), "@"), Name. +-doc """ +List candidates according to the selected strategy. +""". +-spec candidates() -> {ok, [{classy:cluster_id(), node()}]} | ignore. +candidates() -> + with_strategy( + fun(Mod, Options) -> + discover(Mod, Options) + end). + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -104,8 +115,6 @@ handle_cast(Cast, S) -> -doc false. handle_info(#to_discover{}, S) -> {noreply, handle_discover(S)}; -handle_info({'EXIT', _, shutdown}, S) -> - {stop, shutdown, S}; handle_info(Info, S) -> ?tp(warning, ?classy_unknown_event, #{ kind => info diff --git a/src/classy_builtin_hooks.erl b/src/classy_builtin_hooks.erl index 47f6c6b..35a30dc 100644 --- a/src/classy_builtin_hooks.erl +++ b/src/classy_builtin_hooks.erl @@ -9,10 +9,14 @@ , log_create_site/1 , log_create_cluster/2 , log_pre_join/4 - , log_post_join/3 + , log_post_join/4 , log_membership_change/4 , log_run_level/2 - , log_peer_connection_change/5 + , log_peer_connection_change/3 + , log_peer_liveness_change/2 + , log_peer_restart/2 + , log_peer_node_change/3 + , log_autoclean/1 ]). -include("classy_internal.hrl"). @@ -59,11 +63,12 @@ log_pre_join(Cluster, Remote, Node, UserArg) -> , user_arg => UserArg }). -log_post_join(Cluster, Local, JoinToNode) -> +log_post_join(Cluster, Local, JoinToNode, Intent) -> ?tp(notice, classy_joined_cluster, #{ cluster => Cluster , local => Local , joined_to_node => JoinToNode + , intent => Intent }). log_membership_change(Cluster, Local, Remote, Member) -> @@ -81,20 +86,47 @@ log_run_level(From, To) -> ?tp(info, classy_change_run_level, #{ from => From , to => To + , local => classy_node:maybe_site() }). -log_peer_connection_change(_Cluster, Local, Remote, Node, ConnStatus) -> +log_peer_connection_change(Site, Node, ConnStatus) -> Kind = case ConnStatus of - true -> classy_peer_connected; + true -> classy_peer_connected; false -> classy_peer_disconnected end, - Level = case Remote of - Local -> debug; - _ -> notice + Level = case classy_node:maybe_site() of + Site -> debug; + _ -> notice end, ?tp(Level, Kind, - #{ remote => Remote - , node => Node + #{ site => Site + , node => Node + }). + +log_peer_liveness_change(Peer, IsLive) -> + case IsLive of + true -> + Level = info, + Kind = classy_peer_up; + false -> + Level = warning, + Kind = classy_peer_down + end, + ?tp(Level, Kind, #{site => Peer}). + +log_peer_restart(Peer, NRestarts) -> + ?tp(info, classy_peer_restarted, #{site => Peer, n_restarts => NRestarts}). + +log_peer_node_change(Peer, From, To) -> + ?tp(warning, classy_peer_node_change, + #{ site => Peer + , from => From + , to => To + }). + +log_autoclean(Target) -> + ?tp(debug, classy_peer_autoclean, + #{ site => Target }). %%================================================================================ diff --git a/src/classy_hook.erl b/src/classy_hook.erl index c46a251..2f81526 100644 --- a/src/classy_hook.erl +++ b/src/classy_hook.erl @@ -12,9 +12,11 @@ Module responsible for managing the hooks. , insert/3 , unhook/1 , foreach/2 + , map/2 , fold/3 , all/2 , first_match/2 + , timeout/0 ]). -export_type([ hookpoint/0 @@ -50,20 +52,33 @@ It can be used to unregister the hook. %% API functions %%================================================================================ +-doc """ +Get the configured hook timeout value (with defaults). +""". +timeout() -> + application:get_env(classy, hook_timeout, 30_000). + -doc false. init() -> ets:new(?tab, [named_table, ordered_set, public, {keypos, 1}]), %% Default initialization: classy:on_node_init(fun classy_builtin_hooks:gen_random_site_id/0, ?min_hook_prio), classy:post_kick(fun classy_builtin_hooks:maybe_reinitialize_after_kick/3, ?min_hook_prio), + %% Liveness tracking: + classy:run_level(fun classy_liveness:on_run_level/2, ?max_hook_prio), + classy:on_peer_connection_change(fun classy_liveness:on_peer_connection_change/3, ?max_hook_prio), %% Info logging: classy:on_create_site(fun classy_builtin_hooks:log_create_site/1, ?max_hook_prio), classy:on_create_cluster(fun classy_builtin_hooks:log_create_cluster/2, ?max_hook_prio), classy:pre_join(fun classy_builtin_hooks:log_pre_join/4, ?max_hook_prio), - classy:post_join(fun classy_builtin_hooks:log_post_join/3, ?min_hook_prio), + classy:post_join(fun classy_builtin_hooks:log_post_join/4, ?min_hook_prio), classy:on_membership_change(fun classy_builtin_hooks:log_membership_change/4, ?max_hook_prio), classy:run_level(fun classy_builtin_hooks:log_run_level/2, ?min_hook_prio), - classy:on_peer_connection_change(fun classy_builtin_hooks:log_peer_connection_change/5, ?max_hook_prio), + classy:on_peer_connection_change(fun classy_builtin_hooks:log_peer_connection_change/3, ?max_hook_prio), + classy:on_peer_liveness_change(fun classy_builtin_hooks:log_peer_liveness_change/2, ?max_hook_prio), + classy:on_peer_restart(fun classy_builtin_hooks:log_peer_restart/2, ?max_hook_prio), + classy:on_peer_node_change(fun classy_builtin_hooks:log_peer_node_change/3, ?max_hook_prio), + classy:pre_autoclean(fun classy_builtin_hooks:log_autoclean/1, ?max_hook_prio), %% Discovery strategies: classy_discovery_static:hook(), classy_discovery_dns:hook(), @@ -130,6 +145,25 @@ fold(Hookpoint, Args, Acc0) -> Result end. +-doc """ +Apply every hook to the arguments +and return the list of outputs for each hook. + +Failures are ignored (logged). +""". +-spec map(hookpoint(), list()) -> list(). +map(Hookpoint, Args) -> + lists:filtermap( + fun(Hook) -> + case safe_apply(Hookpoint, Hook, Args) of + {ok, Result} -> + {true, Result}; + _ -> + false + end + end, + hooks(Hookpoint)). + -doc """ Ensure that all functions hooked into @code{Hookpoint} return @code{ok}. @@ -186,14 +220,14 @@ hooks(Hookpoint) -> ets:select(?tab, [MS]). -spec safe_apply(hookpoint(), fun(), list()) -> {ok, _Val} | error. -safe_apply(HookPoint, Fun, A) -> - try - {ok, apply(Fun, A)} - catch - EC:Err:Stack -> - ?tp(warning, classy_hook_failure, - #{ EC => Err - , stack => Stack +safe_apply(HookPoint, Fun, Args) -> + Timeout = timeout(), + case classy_lib:safe_apply_with_timeout({Fun, Args}, Timeout) of + {ok, _} = Ok -> + Ok; + Err -> + ?tp(critical, ?classy_hook_failure, + #{ reason => Err , hook => Fun , hookpoint => HookPoint }), diff --git a/src/classy_internal.hrl b/src/classy_internal.hrl index a349f6d..226e5bf 100644 --- a/src/classy_internal.hrl +++ b/src/classy_internal.hrl @@ -19,6 +19,8 @@ -include_lib("snabbkaffe/include/trace.hrl"). -include("classy.hrl"). +-define(classy_proto_vsn, 1). + -define(max_hook_prio, 100000). -define(min_hook_prio, -?max_hook_prio). @@ -35,6 +37,10 @@ -define(on_pre_autoclean, on_pre_autoclean). -define(on_pre_autocluster, on_pre_autocluster). -define(on_enrich_site_info, on_enrich_site_info). +-define(on_peer_liveness_change, on_peer_liveness_change). +-define(on_peer_restart, on_peer_restart). +-define(on_peer_node_change, on_peer_node_change). +-define(on_node_classify, on_node_classify). %% Run levels: -define(stopped, stopped). @@ -47,6 +53,8 @@ -define(classy_abnormal_exit, classy_abnormal_exit). -define(classy_table_anomaly, classy_table_anomaly). -define(classy_bad_data, classy_bad_data). +-define(classy_run_level_change_error, classy_run_level_change_error). +-define(classy_hook_failure, classy_hook_failure). -define(classy_vote_pre_results, classy_vote_pre_results). -define(classy_vote_coord_stage, classy_vote_coord_stage). @@ -69,12 +77,21 @@ -define(site_info, classy_site_status_tab). -record(site_info, { isconn - , liveness + , isup + , nrestarts , node - , last_update + , meta + %% Time when the the value of isconn last changed: + , conn_change_time , reserved = [] }). + +%% Shared classy tables and their keys: +-define(globals, classy_node). %% Number of restarts since creation of the site -define(n_restarts, n_restarts). +-define(pt_node_sets, classy_pt_node_sets). +-define(pt_site_sets, classy_pt_site_sets). + -endif. diff --git a/src/classy_lib.erl b/src/classy_lib.erl index 648b840..3bde6a1 100644 --- a/src/classy_lib.erl +++ b/src/classy_lib.erl @@ -13,6 +13,7 @@ Misc. utility functions. , sites_to_nodes/1 , safe_apply/1 , safe_apply/3 + , safe_apply_with_timeout/2 , multicast/1 , multicall/1 , multicall/2 @@ -20,6 +21,8 @@ Misc. utility functions. %% internal exports: -export([ rpc_timeout/0 + , to_cluster_sets/0 + , quorum_sets/0 , n_sites/0 , time_s/0 , adjust_time_s_skew/2 @@ -50,6 +53,8 @@ Misc. utility functions. -type mfargs() :: {module(), atom(), list()}. +-type callback() :: mfargs() | {function(), list()}. + -type multicall_target() :: classy:site() | {classy:site(), _Token}. @@ -77,9 +82,11 @@ Misc. utility functions. -doc """ @xref{classy_lib:safe_apply/3} """. --spec safe_apply(mfargs()) -> {ok, term()} | wrapped_exception(). +-spec safe_apply(callback()) -> {ok, term()} | wrapped_exception(). safe_apply({M, F, A}) -> - safe_apply(M, F, A). + safe_apply(M, F, A); +safe_apply({Fun, Args}) when is_function(Fun), is_list(Args) -> + safe_apply({erlang, apply, [Fun, Args]}). -doc """ Apply a function while catching all exceptions and returning them as a term. @@ -96,6 +103,23 @@ safe_apply(Module, Function, Args) -> {error, {exit, Reason}} end. +-doc """ +Apply a function in a separate process with a timeout. +""". +-spec safe_apply_with_timeout(callback(), timeout()) -> {ok, term()} | wrapped_exception() | {error, timeout}. +safe_apply_with_timeout(Callback, Timeout) -> + {Pid, MRef} = spawn_monitor( + fun() -> + exit(safe_apply(Callback)) + end), + receive + {'DOWN', MRef, process, Pid, Reason} -> + Reason + after Timeout -> + demonitor(MRef, [flush]), + exit(Pid, kill), + {error, timeout} + end. -doc """ Call functions on multiple sites similarly to @ref{classy_lib:multicall/2} @@ -264,6 +288,16 @@ fold_per_cluster(Fun, InitialAcc, #{infos := Infos}) -> rpc_timeout() -> application:get_env(classy, rpc_timeout, 5_000). +-doc "Return value of @ref{to_cluster_sets} (with default)". +-spec to_cluster_sets() -> [classy:node_set_name(), ...]. +to_cluster_sets() -> + [all | application:get_env(classy, to_cluster_sets, [])]. + +-doc "Return value of @ref{quorum_sets} (with default)". +-spec quorum_sets() -> [classy:node_set_name(), ...]. +quorum_sets() -> + [connected | application:get_env(classy, quorum_sets, [])]. + -doc "Return value of @ref{n_sites} environment variable (with default)". n_sites() -> application:get_env(classy, n_sites, 1). diff --git a/src/classy_liveness.erl b/src/classy_liveness.erl new file mode 100644 index 0000000..fc2843a --- /dev/null +++ b/src/classy_liveness.erl @@ -0,0 +1,336 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2026 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(classy_liveness). +-moduledoc """ +A process that is tasked with monitoring liveness status of peers. +Its responsibilities include: + +@enumerate +@item detecting that remote sites are down, +making a coordinated decision that the site is down. + +@item automatically kick sites that have been down from the cluster. +@end enumerate + +Relevant configurations are @ref{max_site_downtime} and @ref{quorum}. + +In a partitioned network there is a risk that sites try to kick each other. + +Liveness requires quorum of running nodes before making the decision to kick. +Note: as @code{quorum(running)} is always >= @code{quorum(config)}, +even in a partition containing single node, +liveness won't activate if @link{quorum} config is set to a value > 1. + +""". + +-behavior(gen_server). + +%% API: +-export([n_restarts/0]). + +%% behavior callbacks: +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +%% internal exports: +-export([ start_link/0 + + , on_run_level/2 + , on_peer_connection_change/3 + + , vote_down_prep/5 + , vote_down_commit/4 + , vote_kick_prep/4 + , vote_kick_commit/3 + ]). + +-export_type([]). + +-include("classy_internal.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-record(to_autoclean, {}). + +-record(cast_check_site, + { site :: classy:site() + , node :: node() + }). +-record(cast_quorum, {}). + +-define(vote_tag(SITE), {classy_liveness, SITE}). + +%%================================================================================ +%% API functions +%%================================================================================ + +-define(SERVER, ?MODULE). + +-doc """ +Return number of node restarts since creation of the site. + +This value is monotonically increasing. +""". +-spec n_restarts() -> {ok, non_neg_integer()} | {error, nodedown}. +n_restarts() -> + case classy_table:lookup(?globals, ?n_restarts) of + [N] -> + {ok, N}; + _ -> + {error, nodedown} + end. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +-doc false. +-spec vote_down_prep(boolean(), classy_vote:id(), classy:cluster_id(), classy:site(), non_neg_integer()) -> boolean(). +vote_down_prep(_ForReal, _Id, Cluster, Target, NRestarts) -> + maybe + {ok, Cluster} ?= classy:the_cluster(), + {ok, NRestarts} ?= classy_node:n_restarts(Target), + disconnected(Target) + else + _ -> false + end. + +-doc false. +-spec vote_down_commit(classy_vote:id(), classy:cluster_id(), classy:site(), non_neg_integer()) -> ok. +vote_down_commit(_Id, Cluster, Target, NRestarts) -> + classy_membership:set_liveness(Cluster, classy_node:maybe_site(), Target, NRestarts, false, false). + +-doc false. +-spec vote_kick_prep(boolean(), classy_vote:id(), classy:cluster_id(), classy:site()) -> boolean(). +vote_kick_prep(_ForReal, _Id, Cluster, Target) -> + can_be_kicked(Cluster, Target). + +-doc false. +-spec vote_kick_commit(classy_vote:id(), classy:cluster_id(), classy:site()) -> ok. +vote_kick_commit(_Id, _Cluster, Target) -> + classy:kick_site(Target, autoclean). + +-doc false. +-spec on_peer_connection_change(classy:site(), node(), boolean()) -> ok. +on_peer_connection_change(_Site, _Node, true) -> + ok; +on_peer_connection_change(Site, Node, false) -> + gen_server:cast(?SERVER, #cast_check_site{site = Site, node = Node}). + +-doc false. +-spec on_run_level(classy:run_level(), classy:run_level()) -> ok. +on_run_level(stopped, single) -> + increase_n_restarts(), + set_my_liveness_info(true); +on_run_level(single, stopped) -> + set_my_liveness_info(false); +on_run_level(cluster, quorum) -> + gen_server:cast(?SERVER, #cast_quorum{}), + ok; +on_run_level(_, _) -> + ok. + +-doc false. +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +-record(s, + { periodic_timer :: classy_lib:wakeup_timer() + }). + +-doc false. +init(_) -> + process_flag(trap_exit, true), + S = #s{}, + check_down(S), + {ok, wakeup(S)}. + +-doc false. +handle_call(Call, From, S) -> + ?tp(warning, ?classy_unknown_event, + #{ kind => call + , from => From + , content => Call + , server => ?MODULE + }), + {reply, {error, unknown_call}, S}. + +-doc false. +handle_cast(#cast_quorum{}, S) -> + check_down(S), + {noreply, S}; +handle_cast(#cast_check_site{site = Site, node = _Node}, S) -> + check_down(Site, S), + {noreply, S}; +handle_cast(Cast, S) -> + ?tp(warning, ?classy_unknown_event, + #{ kind => cast + , content => Cast + , server => ?MODULE + }), + {noreply, S}. + +-doc false. +handle_info(#to_autoclean{}, S0) -> + S = S0#s{periodic_timer = undefined}, + check_down(S), + kick_down_sites(S), + {noreply, wakeup(S)}; +handle_info(Info, S) -> + ?tp(warning, ?classy_unknown_event, + #{ kind => info + , content => Info + , server => ?MODULE + }), + {noreply, S}. + +-doc false. +terminate(_Reason, _S) -> + ok. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +check_down(S) -> + [check_down(I, S) || I <- classy:sites(all)]. + +check_down(Target, _S) -> + maybe + true ?= disconnected(Target), + {ok, NRestarts} ?= classy_node:n_restarts(Target), + {ok, Cluster} ?= classy:the_cluster(), + {ok, Consilium} ?= consilium(), + Actions = #{ prepare => {?MODULE, vote_down_prep, [Cluster, Target, NRestarts]} + , commit => [{?MODULE, vote_down_commit, [Cluster, Target, NRestarts]}] + }, + _ = classy_vote:create(#{ tag => ?vote_tag(Target) + , actions => #{I => Actions || I <- Consilium} + }), + ok + else + _ -> ok + end. + +set_my_liveness_info(Running) -> + Cluster = classy_node:maybe_cluster(), + Site = classy_node:maybe_site(), + {ok, NR} = n_restarts(), + classy_membership:set_liveness(Cluster, Site, Site, NR, true, Running). + +-spec increase_n_restarts() -> non_neg_integer(). +increase_n_restarts() -> + %% TODO: run this in a critical section: + do_increase_n_restarts(). + +do_increase_n_restarts() -> + N = case classy_table:lookup(?globals, ?n_restarts) of + [N0] when is_integer(N0) -> + N0 + 1; + [] -> + 1; + Other -> + ?tp(warning, ?classy_bad_data, + #{ table => ?globals + , key => ?n_restarts + , val => Other + }), + 1 + end, + classy_table:write(?globals, ?n_restarts, N), + N. + +kick_down_sites(_S) -> + maybe + {ok, Cluster} ?= classy:the_cluster(), + {ok, Local} ?= classy:the_site(), + %% Calculate minimum wall time when site should be alive: + MaxDownSecs = max_downtime(), + true ?= is_integer(MaxDownSecs), + lists:foreach( + fun(Target) -> + maybe + true ?= can_be_kicked(Cluster, Target), + ok ?= classy_hook:all(?on_pre_autoclean, [Target]), + {ok, Consilium} ?= consilium(), + Actions = #{ prepare => {?MODULE, vote_kick_prep, [Cluster, Target]} + , commit => [{?MODULE, vote_kick_commit, [Cluster, Target]}] + }, + classy_vote:create(#{ tag => ?vote_tag(Target) + , actions => #{I => Actions || I <- Consilium} + }) + end + end, + classy_membership:members(Cluster, Local)), + classy_membership:cleanup(Cluster, Local, forget_after()) + end, + ok. + +-spec wakeup(#s{}) -> #s{}. +wakeup(S = #s{periodic_timer = T0}) -> + T = classy_lib:wakeup_after(#to_autoclean{}, check_interval(), T0), + S#s{periodic_timer = T}. + +-spec max_downtime() -> pos_integer() | infinity. +max_downtime() -> + application:get_env(classy, max_site_downtime, infinity). + +-spec check_interval() -> pos_integer(). +check_interval() -> + application:get_env(classy, cleanup_check_interval, 5_000). + +-spec forget_after() -> pos_integer(). +forget_after() -> + application:get_env(classy, forget_after, 7 * 24 * 60 * 60). + +-spec disconnected(classy:site()) -> boolean(). +disconnected(Site) -> + ordsets:is_element(Site, classy:sites(disconnected)) andalso + not ordsets:is_element(Site , classy:sites(down)). + +-spec site_disconn_since(classy:site()) -> {ok, classy_lib:unix_time_s()} | ignore. +site_disconn_since(Site) -> + case classy_table:lookup(?site_info, Site) of + [#site_info{isconn = false, isup = false, conn_change_time = DownSince}] -> + {ok, DownSince}; + [] -> + %% We have never seen the site alive: + {ok, 0}; + [_] -> + ignore + end. + +can_be_kicked(Cluster, Target) -> + maybe + %% Self-checks: + {ok, Cluster} ?= classy:the_cluster(), + {ok, Local} ?= classy:the_site(), + true ?= Local =/= Target, + MaxDownSecs = max_downtime(), + true ?= is_integer(MaxDownSecs), + %% Check target: + true ?= ordsets:is_element(Target, classy:sites(down)), + {ok, DisconnectedSince} ?= site_disconn_since(Target), + classy_lib:time_s() - DisconnectedSince > MaxDownSecs + else + _ -> false + end. + +consilium() -> + Sites = + ordsets:intersection( + [classy:sites(Set) || Set <- classy_lib:quorum_sets()]), + case length(Sites) >= classy:quorum(config) of + true -> + {ok, Sites}; + false -> + undefined + end. diff --git a/src/classy_membership.erl b/src/classy_membership.erl index 0b1dffa..533cba8 100644 --- a/src/classy_membership.erl +++ b/src/classy_membership.erl @@ -15,6 +15,8 @@ Business code should not use it directly. %% API: -export([ known_clusters/1 , set_member/4 + , set_info/3 + , set_liveness/6 , members/2 , list_local_sites/1 , get_data/4 @@ -22,6 +24,8 @@ Business code should not use it directly. , flush/2 , node_of_site/2 , site_of_node/2 + , to_liveness/3 + , from_liveness/1 , dump/0 ]). @@ -35,12 +39,13 @@ Business code should not use it directly. -export([reset_acked_out/4]). -endif. --export_type([start_args/0, op/0, ord/0, clock/0, sync_data/0, pk_last/0, pv_last/0]). +-export_type([start_args/0, op/0, ord/0, clock/0, sync_data/0, pk_last/0, pv_last/0, event/0]). -include("classy_internal.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -ifdef(TEST). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -66,7 +71,9 @@ Business code should not use it directly. %% Types of keys stored in the cluster CRDT. -record(mem, {s :: classy:site() | atom()}). -record(host, {s :: classy:site() | atom()}). --type key() :: #mem{} | #host{}. +-record(info, {s :: classy:site() | atom()}). +-record(live, {s :: classy:site() | atom()}). +-type key() :: #mem{} | #host{} | #info{} | #live{}. -doc "Arbitrary term used to break ties between commands with the same logical timestamp.". -type magic() :: term(). @@ -119,6 +126,8 @@ Business code should not use it directly. , sync_timer :: classy_lib:wakeup_timer() %% Logical clock: , clock :: clock() + %% Logical clock of the last update sent to `classy_node' worker + , events_since = 0 }). %% Persistent table keys: @@ -136,8 +145,6 @@ Business code should not use it directly. %% Last set command for the site: -record(pk_last, {c, l, k}). -type pk_last() :: #pk_last{c :: classy:cluster_id(), l :: classy:site(), k :: key()}. -%% Hooks have been executed for all site states older than this: --record(pk_hooks_ran, {c :: classy:cluster_id(), l :: classy:site()}). %% Composite table values: -record(pv_last, @@ -146,6 +153,13 @@ Business code should not use it directly. }). -type pv_last() :: #pv_last{op :: op(), toi :: clock()}. +-type event() :: {mem, classy:site(), boolean()} | + {host, classy:site(), node()} | + {meta, classy:site(), map()} | + {liveness, classy:site(), _NRestarts :: non_neg_integer(), _Self :: boolean(), _IsUp :: boolean()}. + +-type liveness() :: non_neg_integer(). + %%================================================================================ %% API functions %%================================================================================ @@ -189,6 +203,40 @@ set_member(Cluster, Local, Target, IsMember) when is_boolean(IsMember) -> EC:Err -> {error, {EC, Err}} end. +-doc """ +Set site information. + +This function is called by the site itself. +""". +-spec set_info(classy:cluster_id(), classy:site(), _Info) -> ok | {error, _}. +set_info(Cluster, Local, Info) -> + try + gen_server:call( + ?via(Cluster, Local), + #call_set{k = #info{s = Local}, v = Info}, + ?call_timeout) + catch + EC:Err -> {error, {EC, Err}} + end. + +-doc """ +Set site liveness info. + +Normally, liveness is only updated by the local site. +The remotes can only update @code{IsUp} boolean on behalf of other nodes, +but they must always keep @code{NRestarts} as is. +""". +-spec set_liveness(classy:cluster_id(), classy:site(), classy:site(), non_neg_integer(), boolean(), boolean()) -> ok | {error, _}. +set_liveness(Cluster, Local, Target, NRestarts, Self, IsUp) -> + try + gen_server:call( + ?via(Cluster, Local), + #call_set{k = #live{s = Target}, v = to_liveness(NRestarts, Self, IsUp)}, + ?call_timeout) + catch + EC:Err -> {error, {EC, Err}} + end. + -doc """ Return active members of the @code{Cluster}, as perceived by @code{Local} site. @@ -278,7 +326,9 @@ dump() -> Info = {Val, #{origin => Origin, ltime => Clock, wtime => OWT, ltime_imported => TOI}}, Path = case Key of #mem{s = Target} -> [peers, Target, mem]; - #host{s = Target} -> [peers, Target, host] + #host{s = Target} -> [peers, Target, host]; + #info{s = Target} -> [peers, Target, info]; + #live{s = Target} -> [peers, Target, liveness] end, classy_lib:map_deep_insert( [{Cluster, Local} | Path], @@ -298,17 +348,64 @@ dump() -> classy_lib:map_deep_insert( [{Cluster, Local}, clock], V, - Acc); - #pk_hooks_ran{c = Cluster, l = Local} -> - classy_lib:map_deep_insert( - [{Cluster, Local}, hook_ran], - V, Acc) end end, #{}, ?ptab). +-doc """ +Convert information about liveness to a sortable non-negative integer replicated via CRDT. + +Arguments: +@enumerate + @item Number of restarts of the target site + @item @code{true} of the update is made by the site itself, + @code{false} if the target site is updated by the 3rd party. + @item @code{true} if the site is up. +@end enumerate + +Bits in the integer are organized such that: +@enumerate + @item Liveness information for the higher number of restarts always wins. + @item Liveness updates made by the 3rd party win. + This way the target site knows that its peers decided that it went down. +@end enumerate +""". +-spec to_liveness(non_neg_integer(), boolean(), boolean()) -> liveness(). +to_liveness(NRestarts, Self, IsUp) when is_integer(NRestarts), + NRestarts >= 0, + is_boolean(Self), + is_boolean(IsUp) -> + %% Liveness bits: + %% + %% always 0 always 0 + %% v v + %% n_restarts | 0 | not Self | 0 | IsUp | + OnBehalf = case Self of + false -> 2#100; + true -> 0 + end, + UpBit = case IsUp of + true -> 1; + false -> 0 + end, + (NRestarts bsl 4) bor OnBehalf bor UpBit. + +-doc """ +Convert liveness bitfield to a tuple. +""". +-spec from_liveness(liveness()) -> {NRestarts, Self, IsUp} when + NRestarts :: non_neg_integer(), + Self :: boolean(), + IsUp :: boolean(). +from_liveness(Liveness) when is_integer(Liveness), + Liveness >= 0 -> + NRestarts = Liveness bsr 4, + IsUp = (Liveness band 1) > 0, + Self = Liveness band 2#100 =:= 0, + {NRestarts, Self, IsUp}. + -ifdef(TEST). reset_acked_out(Cluster, Local, Remote, Clock) -> @@ -471,8 +568,7 @@ handle_flush(S0) -> S = handle_sync_out(S0), ok = classy_table:flush(?ptab), %% This one updates hook cursor persistently, no need to flush again: - run_hooks(S), - S. + notify(S). -doc """ Total order of the operation. @@ -489,13 +585,15 @@ This is most likely to happen during a network partition. Please see theories/classy.v file for more details and some intricate requirements for this function. """. -spec ord(op()) -> ord(). +ord(#op_set{k = #live{}, val = Liveness, c = C, origin = O}) -> + true = is_integer(Liveness), + %% Liveness information is merged according to special rules, + %% as the value itself is designed for a causal ordering serialization. + %% See bitfield definition in `to_liveness' + {Liveness, C, O}; ord(#op_set{c = C, m = M, origin = O}) -> {C, M, O}. --spec state(op()) -> boolean(). -state(#op_set{val = Val}) -> - Val. - -spec local_command(#call_set{}, #s{}) -> #s{}. local_command(Cmd, S0) -> {C, S} = inc_get_clock(S0), @@ -617,21 +715,32 @@ merge(LTime, Op, S) -> true end. -run_hooks(S = #s{clock = C, cluster = Cluster, site = Local}) -> - UpdatedEntries = memtab_since(hooks_ran(S) + 1, S), - lists:foreach( - fun(Op = #op_set{k = #mem{s = Peer}}) -> - IsConn = state(Op), - classy_hook:foreach(?on_membership_change, [Cluster, Local, Peer, IsConn]); - (#op_set{}) -> - ok - end, - UpdatedEntries), - classy_table:write(?ptab, #pk_hooks_ran{c = Cluster, l = Local}, C). +notify(S = #s{clock = C, cluster = Cluster, events_since = EventsSince}) -> + %% Find out what changed: + UpdatedEntries = memtab_since(EventsSince + 1, S), + Deltas = lists:flatmap( + fun(#op_set{k = #mem{s = Peer}, val = IsMember}) -> + [{mem, Peer, IsMember}]; + (#op_set{k = #host{s = Peer}, val = Host}) -> + [{host, Peer, Host}]; + (#op_set{k = #info{s = Peer}, val = Meta}) -> + [{meta, Peer, Meta}]; + (#op_set{k = #live{s = Peer}, val = Liveness}) -> + {NRestarts, Self, IsUp} = from_liveness(Liveness), + [{liveness, Peer, NRestarts, Self, IsUp}]; + (#op_set{}) -> + [] + end, + UpdatedEntries), + case Deltas of + [] -> ok; + _ -> classy_node:notify_mem_deltas(Cluster, Deltas) + end, + S#s{events_since = C}. -spec handle_cleanup(pos_integer(), #s{}) -> ok. -handle_cleanup(ForgetAfter, S) -> - Sites = sites_for_cleanup(ForgetAfter, S), +handle_cleanup(ForgetAfter, S = #s{site = Self}) -> + Sites = sites_for_cleanup(Self, ForgetAfter, S), ?tp(debug, classy_membership_forget, #{ interval => ForgetAfter , sites => Sites @@ -645,18 +754,20 @@ handle_cleanup(ForgetAfter, S) -> %% 1. Peer has been kicked from the cluster %% 2. It happened at least SecsDown ago %% 3. Other active peers have received all data about the peer --spec sites_for_cleanup(integer(), #s{}) -> [classy:site()]. -sites_for_cleanup(SecsDown, S) -> +-spec sites_for_cleanup(classy:site(), integer(), #s{}) -> [classy:site()]. +sites_for_cleanup(Self, SecsDown, S) -> MinTimeWhenKicked = classy_lib:time_s() - SecsDown, Peers = peers(S), LongGone = [I || I <- Peers, is_long_gone(MinTimeWhenKicked, I, S)], ActiveMembers = Peers -- LongGone, - MinAcked = min_acked(ActiveMembers, S), + MinAcked = min_acked(ActiveMembers -- [Self], S), [I || I <- LongGone, max_toi(I, S) =< MinAcked]. forget_site(Site, #s{cluster = Cluster, site = Local}) when is_binary(Site) -> classy_table:dirty_delete(?ptab, #pk_last{c = Cluster, l = Local, k = #mem{s = Site}}), classy_table:dirty_delete(?ptab, #pk_last{c = Cluster, l = Local, k = #host{s = Site}}), + classy_table:dirty_delete(?ptab, #pk_last{c = Cluster, l = Local, k = #info{s = Site}}), + classy_table:dirty_delete(?ptab, #pk_last{c = Cluster, l = Local, k = #live{s = Site}}), classy_table:dirty_delete(?ptab, #pk_acked_in{c = Cluster, l = Local, r = Site}), classy_table:dirty_delete(?ptab, #pk_acked_out{c = Cluster, l = Local, r = Site}), ok. @@ -718,20 +829,14 @@ select_nodes(Cluster, Local, Action) -> ets:select(?ptab, [MS]). -doc """ -Find minimal Lamport clock, such that: -@itemize -@item All hooks for the events preceding it are executed locally. -@item All sites from the list have acked it. -@end itemize +Find minimal Lamport clock, +such that all sites from the list have acked it. """. -spec min_acked([classy:site()], #s{}) -> clock() | undefined. -min_acked(Sites, S = #s{site = Local}) -> +min_acked(Sites, S = #s{}) -> lists:foldl( fun(Peer, Acc) -> - case Peer of - Local -> hooks_ran(S); - _ -> min(get_acked_out(Peer, S), Acc) - end + min(get_acked_out(Peer, S), Acc) end, undefined, Sites). @@ -755,7 +860,7 @@ max_toi(Site, #s{cluster = Cluster, site = Local}) -> } , [] , ['$1'] - } || Key <- [#mem{s = Site}, #host{s = Site}]], + } || Key <- [#mem{s = Site}, #host{s = Site}, #info{s = Site}, #live{s = Site}]], L = ets:select(?ptab, MS), case L of [] -> undefined; @@ -818,15 +923,6 @@ get_acked_out(Site, #s{cluster = C, site = Local}) -> [] -> 0 end. --spec hooks_ran(#s{}) -> clock(). -hooks_ran(#s{cluster = Cluster, site = Local}) -> - case classy_table:lookup(?ptab, #pk_hooks_ran{c = Cluster, l = Local}) of - [Clock] -> - Clock; - [] -> - 0 - end. - -spec set_acked_in(classy:site(), clock(), #s{}) -> ok. set_acked_in(Site, Clock, #s{cluster = Cluster, site = Local}) -> classy_table:dirty_write( diff --git a/src/classy_node.erl b/src/classy_node.erl index ab055b7..01b53d3 100644 --- a/src/classy_node.erl +++ b/src/classy_node.erl @@ -13,24 +13,20 @@ Management of the local site and node. , maybe_init_the_site/1 , join_node/3 , kick_site/2 - , the_site/0 - , the_cluster/0 + , maybe_site/0 + , maybe_cluster/0 , parent_site/0 , nodes/1 , peer_info/0 , node_of_site/2 - , n_restarts/0 - - , at_lower_level/2 + , n_restarts/1 ]). %% behavior callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: --export([hello/0]). - --export_type([run_level_atom/0]). +-export([hello/0, on_ptab_update/2, notify_mem_deltas/2]). -include_lib("snabbkaffe/include/trace.hrl"). -include("classy_internal.hrl"). @@ -41,9 +37,11 @@ Management of the local site and node. %% Type declarations %%================================================================================ +-define(pt_site, classy_node_the_site). +-define(pt_cluster, classy_node_the_cluster). + -define(SERVER, ?MODULE). --define(ptab, classy_node). -define(the_site, the_site). -define(the_cluster, the_cluster). -define(parent_site, parent_site). @@ -54,19 +52,10 @@ Management of the local site and node. , cluster :: classy:cluster_id() | any }). -record(call_kick, {site :: classy:site(), intent :: term()}). --record(cast_membership_change, +-record(cast_mem_deltas, { cluster :: classy:cluster_id() - , local :: classy:site() - , remote :: classy:site() - , member :: boolean() + , data :: [classy_membership:event()] }). --record(call_at_run_level, - { level :: run_level_atom() - , function :: fun(() -> _) - }). - --type run_level_int() :: 0..3. --type run_level_atom() :: ?stopped | ?single | ?cluster | ?quorum. %%================================================================================ %% API functions @@ -107,7 +96,7 @@ maybe_init_the_site(MaybeSite) -> true -> [{w, ?parent_site, Site}]; false -> [] end, - {ok, Effects} = classy_table:atomically(?ptab, Ops1 ++ Ops2 ++ Ops3), + {ok, Effects} = classy_table:atomically(?globals, Ops1 ++ Ops2 ++ Ops3), [Fun() || Fun <- Effects], ok. @@ -119,22 +108,16 @@ start_link() -> -doc """ Return ID of the cluster that the local site currently belongs to. """. --spec the_cluster() -> {ok, classy:cluster_id()} | undefined. -the_cluster() -> - case classy_table:lookup(?ptab, ?the_cluster) of - [V] -> {ok, V}; - [] -> undefined - end. +-spec maybe_cluster() -> classy:cluster_id() | undefined. +maybe_cluster() -> + persistent_term:get(?pt_cluster, undefined). -doc """ Return ID of the local site. """. --spec the_site() -> {ok, classy:site()} | undefined. -the_site() -> - case classy_table:lookup(?ptab, ?the_site) of - [V] -> {ok, V}; - [] -> undefined - end. +-spec maybe_site() -> classy:site() | undefined. +maybe_site() -> + persistent_term:get(?pt_site, undefined). -doc """ Return ID of the site that invited us to @code{the_cluster}. @@ -144,7 +127,7 @@ The return value could be equal to @code{@{ok, the_site()@}} for the site that o """. -spec parent_site() -> {ok, classy:site()} | undefined. parent_site() -> - case classy_table:lookup(?ptab, ?parent_site) of + case classy_table:lookup(?globals, ?parent_site) of [V] -> {ok, V}; [] -> undefined end. @@ -170,16 +153,6 @@ kick_site(Site, Intent) -> #call_kick{site = Site, intent = Intent}, infinity). --doc false. --spec at_lower_level(run_level_atom(), fun(() -> Ret)) -> - {ok, Ret} | - {error | exit | throw, _Reason, _Stacktrace}. -at_lower_level(RunLevel, Fun) -> - gen_server:call( - ?SERVER, - #call_at_run_level{level = RunLevel, function = Fun}, - infinity). - -doc false. -spec nodes(all | connected | disconnected) -> [node()]. nodes(Query) -> @@ -202,10 +175,10 @@ nodes(Query) -> -spec peer_info() -> #{classy:site() => classy:peer_info()}. peer_info() -> ets:foldl( - fun(#classy_kv{k = Site, v = #site_info{node = Node, isconn = IsConn, last_update = LU}}, Acc) -> + fun(#classy_kv{k = Site, v = #site_info{node = Node, isconn = IsConn, conn_change_time = ConnChangeTime}}, Acc) -> Info = #{ node => Node , connected => IsConn - , last_update => LU + , last_update => ConnChangeTime }, Acc#{Site => Info} end, @@ -222,18 +195,14 @@ node_of_site(Site, OnlyConnected) -> undefined end. --doc """ -Return number of node restarts since creation of the site. - -This value is monotonically increasing. -""". --spec n_restarts() -> {ok, non_neg_integer()} | {error, nodedown}. -n_restarts() -> - case classy_table:lookup(?ptab, ?n_restarts) of - [N] -> - {ok, N}; - _ -> - {error, nodedown} +-doc false. +-spec n_restarts(classy:site()) -> {ok, non_neg_integer()} | undefined. +n_restarts(Site) -> + case classy_table:lookup(?site_info, Site) of + [#site_info{nrestarts = NR}] -> + {ok, NR}; + [] -> + undefined end. %%================================================================================ @@ -243,8 +212,6 @@ n_restarts() -> -record(s, { cluster :: classy:cluster_id() | undefined , site :: classy:site() - , run_level = 0 :: run_level_int() - , peer_state = #{} :: #{classy:site() => {node(), boolean()}} }). -doc false. @@ -255,10 +222,8 @@ init(_) -> #{ node_type => visible , nodedown_reason => true }), - ok = classy_table:open(?ptab, #{}), + ok = classy_table:open(?globals, #{on_update => fun ?MODULE:on_ptab_update/2}), ok = classy_table:open(?site_info, #{ets_options => [{read_concurrency, true}]}), - classy:on_membership_change(fun on_membership_change/4, -100), - increase_n_restarts(), classy_hook:foreach(?on_node_init, []), case init_cluster() of {ok, _} = Ok -> @@ -285,16 +250,6 @@ handle_call(#call_kick{site = Target, intent = Intent}, _From, S) -> _ -> {error, local_not_in_cluster} end, {reply, Ret, S}; -handle_call(#call_at_run_level{level = RequestedRunLevel, function = Fun}, _From, S0) -> - RunLevel = min(S0#s.run_level, run_level(RequestedRunLevel)), - S = change_run_level(RunLevel, S0), - Ret = try - {ok, Fun()} - catch - EC:Err:Stack -> - {EC, Err, Stack} - end, - {reply, Ret, update_runtime(S)}; handle_call(Call, From, S) -> ?tp(warning, ?classy_unknown_event, #{ kind => call @@ -305,7 +260,7 @@ handle_call(Call, From, S) -> {reply, {error, unknown_call}, S}. -doc false. -handle_cast(#cast_membership_change{} = Cast, S) -> +handle_cast(#cast_mem_deltas{} = Cast, S) -> handle_membership_change_event(Cast, S); handle_cast(Cast, S) -> ?tp(warning, ?classy_unknown_event, @@ -318,8 +273,6 @@ handle_cast(Cast, S) -> -doc false. handle_info({NodeUpOrDown, _Node, _}, S) when NodeUpOrDown =:= nodeup; NodeUpOrDown =:= nodedown -> {noreply, update_runtime(S)}; -handle_info({'EXIT', _, shutdown}, S) -> - {stop, shutdown, S}; handle_info(Info, S) -> ?tp(warning, ?classy_unknown_event, #{ kind => info @@ -329,18 +282,21 @@ handle_info(Info, S) -> {noreply, S}. -doc false. -terminate(Reason, S) -> +terminate(Reason, _S) -> classy_lib:is_normal_exit(Reason) orelse ?tp(warning, ?classy_abnormal_exit, #{ server => ?MODULE , reason => Reason }), - classy_table:stop(?ptab, 1_000), - classy_table:stop(?site_info, 1_000), - case S of - #s{} -> change_run_level(run_level(?stopped), S); - _ -> ok - end. + classy_table:flush(?globals), + classy_table:flush(?site_info), + sync_set_run_level(?stopped), + persistent_term:erase(?pt_node_sets), + persistent_term:erase(?pt_site_sets), + persistent_term:erase(?pt_site), + persistent_term:erase(?pt_cluster), + classy_table:stop(?globals, 5_000), + classy_table:stop(?site_info, 5_000). %%================================================================================ %% Internal exports @@ -366,55 +322,58 @@ hello() -> Err end. +-doc false. +on_ptab_update(_, Op) -> + case Op of + {w, ?the_cluster, Val} -> + persistent_term:put(?pt_cluster, Val); + {d, ?the_cluster} -> + persistent_term:erase(?pt_cluster); + {w, ?the_site, Val} -> + persistent_term:put(?pt_site, Val); + {d, ?the_site} -> + persistent_term:erase(?pt_site); + _ -> + ok + end. + +-doc false. +-spec notify_mem_deltas(classy:cluster_id(), [classy_membership:event()]) -> ok. +notify_mem_deltas(Cluster, Deltas) -> + gen_server:cast( + ?SERVER, + #cast_mem_deltas{ cluster = Cluster + , data = Deltas + }). + %%================================================================================ %% Internal functions %%================================================================================ -on_membership_change(Cluster, Local, Remote, Member) -> - gen_server:cast(?SERVER, - #cast_membership_change{ cluster = Cluster - , local = Local - , remote = Remote - , member = Member - }). - handle_membership_change_event( - #cast_membership_change{ cluster = Cluster - , local = Local - , remote = Remote - , member = Member - }, - S0 = #s{cluster = ThisCluster, site = ThisSite} + #cast_mem_deltas{ cluster = Cluster + , data = Deltas + }, + S0 = #s{cluster = ThisCluster, site = Local} ) -> ?tp(debug, membership_change, #{ cluster => Cluster , origin => Local - , target => Remote - , member => Member + , data => Deltas }), - if Cluster =:= ThisCluster, - Local =:= ThisSite, - Remote =:= ThisSite, - Member =:= false -> - %% We got kicked: - ?tp(warning, classy_kicked_remotely, - #{ cluster => Cluster - , local => ThisSite - }), - case on_leave(S0, kicked) of + if Cluster =:= ThisCluster -> + case apply_deltas_with_effects(Deltas, S0) of {ok, S} -> {noreply, S}; {error, Err} -> {stop, Err, undefined} end; - Cluster =:= ThisCluster -> - {noreply, update_runtime(S0)}; true -> + %% Update from the old cluster. Ignore it. {noreply, S0} end. -spec update_runtime(#s{}) -> #s{}. -update_runtime(S0) -> - S = update_sites_status(S0), - adjust_run_level(S). +update_runtime(S) -> + adjust_run_level(update_sites_status(S)). handle_kick(Cluster, Local, Target, Intent) -> case classy_hook:all(?on_pre_kick, [Cluster, Target, Intent]) of @@ -440,7 +399,20 @@ handle_join(S, Call) -> ExpectedCluster =:= any -> case classy_hook:all(?on_pre_join, [Cluster, Remote, Node, Intent]) of ok -> - do_join_node(Node, Cluster, Remote, MemData, S); + Res = + global:trans( + {classy_node_join_lock, self()}, + fun() -> + do_join_node(Node, Cluster, Remote, MemData, Intent, S) + end, + [node(), Node], + 1), + case Res of + aborted -> + {error, aborted}; + _ -> + Res + end; {error, _} = Err -> Err end; @@ -457,10 +429,11 @@ handle_join(S, Call) -> classy:cluster_id(), classy:site(), classy_membership:sync_data(), + classy:join_intent(), #s{} ) -> {ok, #s{}} | {error, _}. -do_join_node(Node, Cluster, Remote, MemData, S0) -> +do_join_node(Node, Cluster, Remote, MemData, JoinIntent, S0) -> {ok, Local} = the_site(), case the_cluster() of {ok, Cluster} -> @@ -472,23 +445,24 @@ do_join_node(Node, Cluster, Remote, MemData, S0) -> {ok, update_runtime(S0)}; {ok, OldCluster} when OldCluster =/= Cluster -> %% Site is currently in a different cluster. Leave it first: - Intent = join, - case handle_kick(OldCluster, Local, Local, Intent) of + LeaveIntent = join, + case handle_kick(OldCluster, Local, Local, LeaveIntent) of ok -> - {ok, S} = on_leave(S0, Intent), - do_join_node(Node, Cluster, Remote, MemData, S); + {ok, S} = on_leave(S0, LeaveIntent), + do_join_node(Node, Cluster, Remote, MemData, JoinIntent, S); Err -> Err end; undefined -> %% Site is not in any cluster: - {ok, S} = join_cluster(Cluster, Node, Local, Remote, S0), - do_join_node(Node, Cluster, Remote, MemData, S) + {ok, S} = join_cluster(Cluster, Node, Local, Remote, JoinIntent, S0), + do_join_node(Node, Cluster, Remote, MemData, JoinIntent, S) end. -on_leave(S0 = #s{cluster = Cluster, site = Local}, Intent) -> - S = change_run_level(run_level(?stopped), S0), - classy_table:delete(?ptab, ?the_cluster), +on_leave(S = #s{cluster = Cluster, site = Local}, Intent) -> + sync_set_run_level(?stopped), + %% Sync with the business apps: + classy_table:delete(?globals, ?the_cluster), classy_hook:foreach(?on_post_kick, [Cluster, Local, Intent]), classy_table:clear(?site_info), case Intent of @@ -498,83 +472,29 @@ on_leave(S0 = #s{cluster = Cluster, site = Local}, Intent) -> init_cluster() end. --spec join_cluster(classy:cluster_id(), node(), classy:site(), classy:site(), #s{}) -> {ok, #s{}}. -join_cluster(Cluster, JoinToNode, Local, Remote, S = #s{run_level = 0}) -> +-spec join_cluster(classy:cluster_id(), node(), classy:site(), classy:site(), classy:join_intent(), #s{}) -> {ok, #s{}}. +join_cluster(Cluster, JoinToNode, Local, Remote, Intent, S = #s{}) -> {ok, _} = classy_sup:ensure_membership(Cluster, Local), - classy_hook:foreach(?on_post_join, [Cluster, Local, JoinToNode]), - classy_table:dirty_write(?ptab, ?the_cluster, Cluster), - classy_table:dirty_write(?ptab, ?parent_site, Remote), - classy_table:flush(?ptab), - {ok, S#s{cluster = Cluster, peer_state = #{}}}. + classy_hook:foreach(?on_post_join, [Cluster, Local, JoinToNode, Intent]), + classy_table:dirty_write(?globals, ?the_cluster, Cluster), + classy_table:dirty_write(?globals, ?parent_site, Remote), + classy_table:flush(?globals), + {ok, S#s{cluster = Cluster}}. %% Update node tracking information -spec update_sites_status(#s{}) -> #s{}. -update_sites_status(S0 = #s{cluster = Cluster, site = Local}) -> - %% Gather data: - Nodes = [node() | erlang:nodes()], - Members = classy_membership:members(Cluster, Local), - NodesOfSite = classy_membership:node_of_site(Cluster, Local), - %% Update members: - S1 = lists:foldl( - fun(Site, Acc) -> - case NodesOfSite of - #{Site := Node} -> - IsConn = lists:member(Node, Nodes); - #{} -> - Node = undefined, - IsConn = false - end, - case classy_table:lookup(?site_info, Site) of - [#site_info{isconn = IsConn, node = Node}] -> - %% No changes: - ok; - _ -> - classy_table:dirty_write( - ?site_info, - Site, - #site_info{ isconn = IsConn - , node = Node - , last_update = classy_lib:time_s() - }) - end, - maybe_on_peer_connection_status_change(Acc, Site, Node, IsConn, true) +update_sites_status(S) -> + _ = ets:foldl( + fun(#classy_kv{k = Peer, v = SiteInfo}, Acc) -> + update_site_info(Peer, SiteInfo, S), + Acc end, - S0, - Members), - %% Delete info of gone members: - S = ets:foldl( - fun(#classy_kv{k = Site, v = #site_info{node = Node}}, Acc) -> - case lists:member(Site, Members) of - true -> - Acc; - false -> - classy_table:dirty_delete(?site_info, Site), - maybe_on_peer_connection_status_change(Acc, Site, Node, false, false) - end - end, - S1, + [], ?site_info), - classy_table:flush(?site_info), + ok = classy_table:flush(?site_info), + classify(), S. --spec maybe_on_peer_connection_status_change(#s{}, classy:site(), node() | undefined, boolean(), boolean()) -> #s{}. -maybe_on_peer_connection_status_change(S = #s{cluster = Cluster, site = Local, peer_state = PS0}, Site, Node, IsConn, Keep) -> - Changed = case PS0 of - #{Site := {Node, IsConn}} -> - false; - #{} -> - classy_hook:foreach(?on_peer_connection_status_change, [Cluster, Local, Site, Node, IsConn]), - true - end, - PS = if Changed andalso Keep -> - PS0#{Site => {Node, IsConn}}; - not Keep -> - maps:remove(Site, PS0); - true -> - PS0 - end, - S#s{peer_state = PS}. - init_cluster() -> maybe {ok, Cluster} ?= the_cluster(), @@ -599,7 +519,7 @@ init_cluster() -> , [classy_table:atomic_op(fun(() -> _))] }. ensure_the_id(Key, OnCreateHook, HookArgs, Default) -> - case classy_table:lookup(?ptab, Key) of + case classy_table:lookup(?globals, Key) of [Bin] when is_binary(Bin) -> {false, Bin, []}; [] -> @@ -621,29 +541,22 @@ ensure_the_id(Key, OnCreateHook, HookArgs, Default) -> -spec adjust_run_level(#s{}) -> #s{}. adjust_run_level(S = #s{cluster = Cluster, site = Site}) -> - NKnown = length(classy_membership:members(Cluster, Site)), - NConnected = length(nodes(connected)), + %% NOTE: must be called after `classify': + NKnown = length(intersection(classy_lib:to_cluster_sets())), + NConnected = length(intersection(classy_lib:quorum_sets())), RunLevel = case NKnown >= classy_lib:n_sites() of true -> case NConnected >= classy:quorum(config) of - true -> run_level(?quorum); - false -> run_level(?cluster) + true -> ?quorum; + false -> ?cluster end; - false -> run_level(?single) + false -> ?single end, - change_run_level(RunLevel, S). - --spec change_run_level(run_level_int(), #s{}) -> #s{}. -change_run_level(Level, #s{run_level = Level} = S) when is_integer(Level) -> - S; -change_run_level(To, #s{run_level = From} = S) when To >= 0, To =< 3 -> - Next = if To > From -> - From + 1; - To < From -> - From - 1 - end, - classy_hook:foreach(?on_change_run_level, [run_level(From), run_level(Next)]), - change_run_level(To, S#s{run_level = Next}). + set_run_level(RunLevel), + %% Propagate info to peers: + Info = classy_hook:fold(?on_enrich_site_info, [], #{rl => RunLevel, vsn => ?classy_proto_vsn}), + classy_membership:set_info(Cluster, Site, Info), + S. %% Start membership processes for all known former clusters, in order %% to relay information to former peers. @@ -657,30 +570,270 @@ start_old_clusters(Site) -> end, classy_membership:known_clusters(Site)). --spec increase_n_restarts() -> ok. -increase_n_restarts() -> - N = case classy_table:lookup(?ptab, ?n_restarts) of - [N0] when is_integer(N0) -> - N0 + 1; - [] -> - 0; - Other -> - ?tp(warning, ?classy_bad_data, - #{ table => ?ptab - , key => ?n_restarts - , val => Other - }), - 0 - end, - classy_table:write(?ptab, ?n_restarts, N). - --spec run_level(run_level_int()) -> run_level_atom(); - (run_level_atom()) -> run_level_int(). -run_level(?stopped) -> 0; -run_level(?single) -> 1; -run_level(?cluster) -> 2; -run_level(?quorum) -> 3; -run_level(0) -> ?stopped; -run_level(1) -> ?single; -run_level(2) -> ?cluster; -run_level(3) -> ?quorum. +sync_set_run_level(Level) -> + classy_rl_changer:set_sync(Level, infinity). + +the_cluster() -> + case classy_table:lookup(?globals, ?the_cluster) of + [V] -> + {ok, V}; + [] -> + undefined + end. + +the_site() -> + case classy_table:lookup(?globals, ?the_site) of + [V] -> + {ok, V}; + [] -> + undefined + end. + +-spec apply_deltas_with_effects([classy_membership:event()], #s{}) -> {ok, #s{}} | {error, _}. +apply_deltas_with_effects(Deltas, S0 = #s{cluster = Cluster, site = Local}) -> + {Updated, Kicked} = merge_deltas(Deltas), + {ok, MyNR} = classy_liveness:n_restarts(), + case Kicked of + #{Local := _} -> + %% We got kicked remotely. In this case we don't bother + %% importing the data and running the hooks, and go straight to + %% `on_leave': + ?tp(warning, classy_kicked_remotely, + #{ cluster => Cluster + , local => Local + }), + case on_leave(S0, kicked) of + {ok, S} -> {ok, S}; + {error, _} = Err -> Err + end; + #{} -> + maybe + {ok, S} ?= import_deltas(Updated, Kicked, S0), + case Updated of + #{Local := #site_info{isup = false, nrestarts = NR}} when NR >= MyNR -> + %% Handle network partition; peers decided that we're down: + ?tp(warning, classy_restarted_remotely, + #{ cluster => Cluster + , local => Local + , nrestarts => MyNR + }), + on_remote_restart(S); + _ -> + %% Nothing happened: + {ok, S} + end + end + end. + +-spec on_remote_restart(#s{}) -> {ok, #s{}}. +on_remote_restart(S) -> + classy_rl_changer:set_sync(?stopped, 120_000), + {ok, adjust_run_level(S)}. + +-spec import_deltas( #{classy:site() => #site_info{}}, #{classy:site() => true}, #s{}) -> + {ok, #s{}} | {error, _}. +import_deltas(Updated, Kicked, S0 = #s{cluster = Cluster, site = Local}) -> + %% 1. Process kicked nodes: + maps:foreach( + fun(Peer, _) -> + classy_hook:foreach(?on_membership_change, [Cluster, Local, Peer, false]), + classy_table:dirty_delete(?site_info, Peer) + end, + Kicked), + %% 2. Process updated nodes: + maps:foreach( + fun(Peer, NewInfo) -> + case Kicked of + #{Peer := _} -> + %% Ignore kicked sites: + ok; + #{} -> + update_site_info(Peer, NewInfo, S0) + end + end, + Updated), + maybe + ok ?= classy_table:flush(?site_info), + classify(), + {ok, adjust_run_level(S0)} + end. + +%% 1. Calculate connectivity to the node +%% 2. Diff the current information with the past +%% 3. Run the hooks if the site's status changes +%% 4. Schedule writing of the updated data to the DB +update_site_info(Peer, New0 = #site_info{isup = IsUp, nrestarts = NR}, #s{cluster = Cluster, site = Local}) -> + Node = maps:get(Peer, classy_membership:node_of_site(Cluster, Local), undefined), + IsConn = lists:member(Node, [node() | nodes()]), + New1 = New0#site_info{isconn = IsConn, node = Node}, + %% Get the previous data or use the defaults: + Old = classy_table:lookup(?site_info, Peer), + case Old of + [#site_info{isup = IsUp0, nrestarts = NR0, node = Node0, isconn = IsConn0, conn_change_time = CCT0}] -> + ok; + [] -> + IsUp0 = false, + IsConn0 = false, + CCT0 = undefined, + %% If we haven't seen this peer before, do not report it as + %% restarted: + NR0 = NR, + %% Do not report changed host as well: + Node0 = Node + end, + %% Should we update connection status change time? + CCT = if IsConn0 =/= IsConn; not is_integer(CCT0) -> + classy_lib:time_s(); + true -> + CCT0 + end, + New = New1#site_info{conn_change_time = CCT}, + %% Schedule saving of the data, if changed: + case [New] of + Old -> []; + _ -> classy_table:dirty_write(?site_info, Peer, New) + end, + %% Note: hooks are executed after the data is staged to RAM, but + %% before it is committed to the storage. It means that if the + %% server is interrupted before the data is written, certain hooks + %% will fire multiple times. This is what we want, in fact. + case Old of + [] -> classy_hook:foreach(?on_membership_change, [Cluster, Local, Peer, true]); + _ -> ok + end, + if Peer =/= Local, IsUp0 =/= IsUp -> + classy_hook:foreach(?on_peer_liveness_change, [Peer, IsUp]); + true -> + ok + end, + if Node =/= Node0 -> + classy_hook:foreach(?on_peer_node_change, [Peer, Node0, Node]); + true -> + [] + end, + if Peer =/= Local, NR > NR0, IsUp -> + classy_hook:foreach(?on_peer_restart, [Peer, NR]); + true -> + [] + end, + if Peer =/= Local, IsConn0 =/= IsConn -> + classy_hook:foreach(?on_peer_connection_status_change, [Peer, Node, IsConn]); + true -> + [] + end. + +-spec merge_deltas([classy_membership:event()]) -> {Updated, Kicked} when + Updated :: #{classy:site() => #site_info{}}, + Kicked :: #{classy:site() => true}. +merge_deltas(Data) -> + merge_deltas(Data, #{}, #{}). + +-spec merge_deltas([classy_membership:event()], Updated, Kicked) -> {Updated, Kicked} when + Updated :: #{classy:site() => #site_info{}}, + Kicked :: #{classy:site() => true}. +merge_deltas([], Updated, Kicked) -> + {Updated, Kicked}; +merge_deltas([Up | Rest], Updated0, Kicked0) -> + Get = fun(Peer) -> + case Updated0 of + #{Peer := Val} -> + Val; + #{} -> + case classy_table:lookup(?site_info, Peer) of + [Val] -> Val; + [] -> default_site_info() + end + end + end, + case Up of + {mem, Peer, true} -> + Updated = Updated0#{Peer => Get(Peer)}, + Kicked = maps:remove(Peer, Kicked0); + {mem, Peer, false} -> + Updated = Updated0, + Kicked = Kicked0#{Peer => true}; + {host, Peer, Host} -> + Info0 = Get(Peer), + Info = Info0#site_info{node = Host}, + Updated = Updated0#{Peer => Info}, + Kicked = Kicked0; + {meta, Peer, Meta} -> + Info0 = Get(Peer), + Info = Info0#site_info{meta = Meta}, + Updated = Updated0#{Peer => Info}, + Kicked = Kicked0; + {liveness, Peer, NRestarts, _Self, IsUp} -> + Info0 = Get(Peer), + Info = Info0#site_info{ isup = IsUp + , nrestarts = NRestarts + }, + Updated = Updated0#{Peer => Info}, + Kicked = Kicked0 + end, + merge_deltas(Rest, Updated, Kicked). + +-spec classify() -> ok. +classify() -> + {NodeSets, SiteSets} + = ets:foldl( + fun(#classy_kv{k = Site, v = Info}, Acc0) -> + #site_info{node = Node} = Info, + lists:foldl( + fun(SetName, {AccNodes, AccSites}) -> + case AccNodes of + #{SetName := NSet0} -> ok; + #{} -> NSet0 = [] + end, + case AccSites of + #{SetName := SSet0} -> ok; + #{} -> SSet0 = [] + end, + NSet = ordsets:add_element(Node, NSet0), + SSet = ordsets:add_element(Site, SSet0), + { AccNodes#{SetName => NSet} + , AccSites#{SetName => SSet} + } + end, + Acc0, + sets_of_node(Info)) + end, + {#{}, #{}}, + ?site_info), + persistent_term:put(?pt_node_sets, NodeSets), + persistent_term:put(?pt_site_sets, SiteSets). + +-spec sets_of_node(#site_info{}) -> [classy:node_set_name()]. +sets_of_node(#site_info{isconn = IsConn, isup = IsUp, meta = Meta}) -> + [ case IsConn of + true -> connected; + _ -> disconnected + end + , case IsUp of + true -> up; + false -> down + end + , all + | lists:flatten( + classy_hook:map(?on_node_classify, [Meta])) + ]. + +default_site_info() -> + #site_info{ isconn = false + , isup = false + , nrestarts = 0 + , meta = #{} + }. + +intersection(Sets) -> + ordsets:intersection([classy:nodes(S) || S <- Sets]). + +-ifndef(TEST). +%% In real live we change levels async-ly: +set_run_level(Level) -> + classy_rl_changer:set(Level). +-else. +%% In the tests we want to sequence the events. +set_run_level(Level) -> + ok = classy_rl_changer:set_sync(Level, 5_000), + ok. +-endif. diff --git a/src/classy_rl_changer.erl b/src/classy_rl_changer.erl new file mode 100644 index 0000000..08654ba --- /dev/null +++ b/src/classy_rl_changer.erl @@ -0,0 +1,241 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2025-2026 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(classy_rl_changer). +-moduledoc false. + +-behavior(gen_server). + +%% API: +-export([to_int/1, to_atom/1, at_lower_level/2]). + +%% behavior callbacks: +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +%% internal exports: +-export([start_link/0, set/1, set_sync/2]). + +-export_type([run_level_int/0]). + +-include("classy_internal.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(SERVER, ?MODULE). + +-define(valid_level(LEVEL), ((LEVEL) =:= ?stopped orelse (LEVEL) =:= ?single orelse (LEVEL) =:= ?cluster orelse (LEVEL) =:= ?quorum)). +-define(valid_level_int(LEVEL), ((LEVEL) >=0 andalso (LEVEL) =<3)). + +-type run_level_int() :: 0..3. + +-record(call_set, {level :: classy:run_level()}). + +-record(call_at_run_level, + { level :: classy:run_level() + , function :: fun(() -> _) + }). + +-record(call, + { at :: run_level_int() + , f :: fun(() -> _) + }). + +-record(running, + { next :: run_level_int() + , pid :: pid() + }). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec to_int(classy:run_level()) -> run_level_int(). +to_int(?stopped) -> 0; +to_int(?single) -> 1; +to_int(?cluster) -> 2; +to_int(?quorum) -> 3. + +-spec to_atom(run_level_int()) -> classy:run_level(). +to_atom(0) -> ?stopped; +to_atom(1) -> ?single; +to_atom(2) -> ?cluster; +to_atom(3) -> ?quorum. + +-doc false. +-spec at_lower_level(classy:run_level(), fun(() -> any())) -> ok | {error, _}. +at_lower_level(RunLevel, Fun) -> + gen_server:call( + ?SERVER, + #call_at_run_level{level = RunLevel, function = Fun}, + infinity). + +%%================================================================================ +%% Internal exports +%%================================================================================ + +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [self()], []). + +-spec set(classy:run_level()) -> ok. +set(RunLevel) -> + gen_server:call( + ?SERVER, + #call_set{level = RunLevel}, + infinity). + +%% Warning: this function only works for _lowering_ the layer. +-spec set_sync(classy:run_level(), timeout()) -> ok | {error, timeout}. +set_sync(RunLevel, Timeout) -> + set(RunLevel), + Parent = erlang:alias([reply]), + Ref = make_ref(), + at_lower_level(RunLevel, fun() -> Parent ! Ref end), + receive + Ref -> ok + after Timeout -> + unalias(Parent), + {error, timeout} + end. + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +-record(s, + { set = 0 :: run_level_int() + , current = 0 :: run_level_int() + , running :: #running{} | undefined + , actions = [] :: [#call{}] + }). + +init(_) -> + process_flag(trap_exit, true), + {ok, #s{}}. + +handle_call(#call_set{level = Level}, _From, S0) -> + if ?valid_level(Level) -> + S = maybe_transition(S0#s{set = to_int(Level)}), + {reply, ok, S}; + true -> + {reply, {error, badarg}, S0} + end; +handle_call(#call_at_run_level{level = Level, function = Fun}, _From, #s{actions = AA} = S0) -> + if ?valid_level(Level), is_function(Fun, 0) -> + New = #call{ at = to_int(Level) + , f = Fun + }, + S = maybe_transition(S0#s{actions = [New | AA]}), + {reply, ok, S}; + true -> + {reply, {error, badarg}, S0} + end; +handle_call(Call, From, S) -> + ?tp(warning, ?classy_unknown_event, + #{ kind => call + , from => From + , content => Call + , server => ?MODULE + }), + {reply, {error, unknown_call}, S}. + +handle_cast(Cast, S) -> + ?tp(warning, ?classy_unknown_event, + #{ kind => cast + , content => Cast + , server => ?MODULE + }), + {noreply, S}. + +handle_info({'EXIT', Pid, _Reason}, #s{running = #running{pid = Pid, next = Next}} = S0) -> + S = S0#s{ running = undefined + , current = Next + }, + {noreply, maybe_transition(S)}; +handle_info(Info, S) -> + ?tp(warning, ?classy_unknown_event, + #{ kind => info + , content => Info + , server => ?MODULE + }), + {noreply, S}. + +terminate(Reason, S) -> + classy_lib:is_normal_exit(Reason) orelse + ?tp(warning, ?classy_abnormal_exit, + #{ server => ?MODULE + , reason => Reason + }), + terminate_loop(maybe_transition(S#s{set = 0, actions = []})). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +terminate_loop(#s{current = 0, running = undefined}) -> + ok; +terminate_loop(#s{running = #running{next = Next, pid = Pid}} = S0) -> + receive + {'EXIT', Pid, _} -> + S = S0#s{ running = undefined + , current = Next + }, + terminate_loop(maybe_transition(S)) + end. + +-spec maybe_transition(#s{}) -> #s{}. +maybe_transition(#s{running = #running{}} = S0) -> + S0; +maybe_transition(#s{actions = AA0, set = Set, current = From, running = undefined} = S0) -> + To = lists:foldl( + fun(#call{at = At}, Acc) -> min(At, Acc) end, + Set, + AA0), + Next = if To > From -> + From + 1; + To < From -> + From - 1; + To =:= From -> + From + end, + {ExecNow, AA} = + lists:partition( + fun(#call{at = L}) -> L >= Next end, + AA0), + case ExecNow of + [] when Next =:= From -> + %% Nothing to do: + S0; + _ -> + %% Start transition: + Running = run_hooks(From, Next, ExecNow), + S0#s{running = Running, actions = AA} + end. + +run_hooks(From, Next, Actions) -> + FromA = to_atom(From), + NextA = to_atom(Next), + Worker = spawn_link( + fun() -> + From =/= Next andalso + classy_hook:foreach(?on_change_run_level, [FromA, NextA]), + lists:foreach( + fun(#call{f = Fun}) -> + try + Fun() + catch + EC:Err:Stack -> + ?tp(critical, ?classy_run_level_change_error, + #{ call => Fun + , EC => Err + , stack => Stack + }) + end + end, + Actions) + end), + #running{ next = Next + , pid = Worker + }. diff --git a/src/classy_sup.erl b/src/classy_sup.erl index 290402a..8e72966 100644 --- a/src/classy_sup.erl +++ b/src/classy_sup.erl @@ -115,24 +115,24 @@ start_link_vote_participant_sup() -> init(#top{}) -> _ = classy_hook:init(), + RLChanger = #{ id => run_level_mgr + , start => {classy_rl_changer, start_link, []} + , shutdown => infinity + , restart => permanent + , type => worker + }, Node = #{ id => node , start => {classy_node, start_link, []} , shutdown => 10_000 , restart => permanent , type => worker }, - UIDGen = #{ id => uid - , start => {classy_uid, start_link, []} - , shutdown => 5_000 - , restart => permanent - , type => worker - }, - Autoclean = #{ id => autoclean - , start => {classy_autoclean, start_link, []} - , shutdown => 10_000 - , restart => permanent - , type => worker - }, + Liveness = #{ id => liveness + , start => {classy_liveness, start_link, []} + , shutdown => 10_000 + , restart => permanent + , type => worker + }, Autocluster = #{ id => autocluster , start => {classy_autocluster_sup, start_link, []} , shutdown => infinity @@ -141,11 +141,11 @@ init(#top{}) -> }, Children = [ sup_spec(#{id => ?TABLE_SUP, start => {?MODULE, start_link_table_sup, []}}) , sup_spec(#{id => ?MEMBERSHIP_SUP, start => {?MODULE, start_link_membership_sup, []}}) + , RLChanger , Node - , UIDGen , sup_spec(#{id => ?VOTE_COORDINATOR_SUP, start => {?MODULE, start_link_vote_coordinator_sup, []}}) , sup_spec(#{id => ?VOTE_PARTICIPANT_SUP, start => {?MODULE, start_link_vote_participant_sup, []}}) - , Autoclean + , Liveness , Autocluster ], SupFlags = #{ strategy => rest_for_one diff --git a/src/classy_uid.erl b/src/classy_uid.erl index ba04305..7c74808 100644 --- a/src/classy_uid.erl +++ b/src/classy_uid.erl @@ -8,22 +8,15 @@ This module contains utilities for constructing unique IDs using various algorithms. """. --behavior(gen_server). - %% API: -export([ site_unique_tuple/0 , cluster_unique_tuple/0 - , site_unique_seq_tuple/1 - , cluster_unique_seq_tuple/1 ]). -%% behavior callbacks: --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). - %% internal exports: -export([start_link/0]). --export_type([su_tuple/0, cu_tuple/0, sequence/0]). +-export_type([su_tuple/0, cu_tuple/0]). -include("classy_internal.hrl"). @@ -33,21 +26,12 @@ using various algorithms. -define(volatile_sequences, classy_uid_volatile_sequences_tab). --define(pterm_uid_gen, classy_uid_gen). - --type pterm_uid_gen() :: #{ site := classy:site() - , n_restarts := non_neg_integer() - }. - -doc "Site-unique tuple". -type su_tuple() :: {non_neg_integer(), pos_integer()}. -doc "Cluster-unique tuple". -type cu_tuple() :: {classy:site(), non_neg_integer(), pos_integer()}. --doc "Identifier of a volatile sequence". --type sequence() :: term(). - %%================================================================================ %% API functions %%================================================================================ @@ -70,7 +54,7 @@ Site-unique tuples can be used to order events within the site. """. -spec site_unique_tuple() -> su_tuple(). site_unique_tuple() -> - #{n_restarts := NRestarts} = get_pterm(), + {ok, NRestarts} = classy_liveness:n_restarts(), {NRestarts, erlang:unique_integer([positive, monotonic])}. -doc """ @@ -83,102 +67,10 @@ but not globally. """. -spec cluster_unique_tuple() -> cu_tuple(). cluster_unique_tuple() -> - #{n_restarts := NRestarts, site := Site} = get_pterm(), + {ok, Site} = classy:the_site(), + {ok, NRestarts} = classy_liveness:n_restarts(), {Site, NRestarts, erlang:unique_integer([positive, monotonic])}. --doc """ -Return a tuple that is guaranteed to be unique within sequence ID and site. - -It consists of number of restarts -followed by a value of a volatile counter identified by sequence ID. -Sequence counter gets reset on node restart. - -This function is expected to be slower than @code{classy_uid:site_unique_tuple/0}, -but its values form a monotonic sequence. -""". --spec site_unique_seq_tuple(sequence()) -> su_tuple(). -site_unique_seq_tuple(Sequence) -> - #{n_restarts := NRestarts} = get_pterm(), - {NRestarts, volatile_counter(Sequence)}. - --doc """ -Similar to @ref{classy_uid:site_unique_seq_tuple/1}, -but includes site name. -""". --spec cluster_unique_seq_tuple(sequence()) -> cu_tuple(). -cluster_unique_seq_tuple(Sequence) -> - #{n_restarts := NRestarts, site := Site} = get_pterm(), - {Site, NRestarts, volatile_counter(Sequence)}. - -%%================================================================================ -%% behavior callbacks -%%================================================================================ - --record(s, {}). - --doc false. -init(_) -> - process_flag(trap_exit, true), - {ok, NRestarts} = classy_node:n_restarts(), - {ok, Site} = classy_node:the_site(), - ets:new(?volatile_sequences, [set, named_table, public, {write_concurrency, true}]), - set_pterm(#{ site => Site - , n_restarts => NRestarts - }), - S = #s{}, - {ok, S}. - --doc false. -handle_call(Call, From, S) -> - ?tp(warning, ?classy_unknown_event, - #{ kind => call - , from => From - , content => Call - , server => ?MODULE - }), - {reply, {error, unknown_call}, S}. - --doc false. -handle_cast(Cast, S) -> - ?tp(warning, ?classy_unknown_event, - #{ kind => cast - , content => Cast - , server => ?MODULE - }), - {noreply, S}. - --doc false. -handle_info({'EXIT', _, shutdown}, S) -> - {stop, shutdown, S}; -handle_info(Info, S) -> - ?tp(warning, ?classy_unknown_event, - #{ kind => info - , content => Info - , server => ?MODULE - }), - {noreply, S}. - --doc false. -terminate(_Reason, _S) -> - persistent_term:erase(?pterm_uid_gen), - ok. - -%%================================================================================ -%% Internal exports -%%================================================================================ - %%================================================================================ %% Internal functions %%================================================================================ - --spec set_pterm(pterm_uid_gen()) -> ok. -set_pterm(PT) -> - persistent_term:put(?pterm_uid_gen, PT). - --spec get_pterm() -> pterm_uid_gen(). -get_pterm() -> - persistent_term:get(?pterm_uid_gen). - --spec volatile_counter(sequence()) -> pos_integer(). -volatile_counter(Sequence) -> - ets:update_counter(?volatile_sequences, Sequence, {2, 1}, {Sequence, 0}). diff --git a/src/classy_vote.erl b/src/classy_vote.erl index 0f2865d..1271df5 100644 --- a/src/classy_vote.erl +++ b/src/classy_vote.erl @@ -267,7 +267,7 @@ Initiate a new vote, @pxref{t:classy_vote:options/0,options/0}. create(UserOptions) -> maybe %% Create a new vote: - ID = classy_uid:cluster_unique_seq_tuple(classy_vote_sequence), + ID = classy_uid:cluster_unique_tuple(), {ok, Options} ?= with_defaults(UserOptions), {ok, _} ?= classy_vote_coordinator:new(ID, Options), {ok, ID} diff --git a/src/classy_vote_coordinator.erl b/src/classy_vote_coordinator.erl index c9d3eb5..a996cf1 100644 --- a/src/classy_vote_coordinator.erl +++ b/src/classy_vote_coordinator.erl @@ -165,11 +165,6 @@ handle_event({call, ReplyTo}, VoteData = #c_vote{}, Stage, D) -> handle_event(state_timeout, ?state_timeout, Stage, D) -> handle_state_timeout(Stage, D); %% Common: -handle_event(info, {'EXIT', _, Reason}, _, _) -> - case Reason of - normal -> keep_state_and_data; - _ -> {stop, shutdown} - end; handle_event(ET, Event, State, _Data) -> %% TODO: put ID and MFAs into error messages ?tp(warning, ?classy_unknown_event, @@ -445,7 +440,7 @@ prepare( #d{id = Id, tag = Tag, opts = #opts{on_fail = OnFail}}, #act{prepare = Prep, commit = Commit, rollback = Rollback} ) -> - {ok, Self} = classy_node:the_site(), + {ok, Self} = classy:the_site(), #prepare{ id = Id , tag = Tag , prepare = Prep diff --git a/src/classy_vote_participant.erl b/src/classy_vote_participant.erl index 622320f..c659774 100644 --- a/src/classy_vote_participant.erl +++ b/src/classy_vote_participant.erl @@ -177,11 +177,6 @@ handle_event(enter, OldStage, Stage, D) -> handle_event(state_timeout, ?state_timeout, ?s_prepare, D) -> %% Perform the actual vote: do_real_vote(D); -handle_event(info, {'EXIT', _, Reason}, _, _) -> - case Reason of - normal -> keep_state_and_data; - _ -> {stop, shutdown} - end; handle_event({call, From}, #c_outcome{} = Outcome, ?s_wait_outcome, D) -> do_receive_outcome(From, Outcome, D); handle_event(ET, Event, State, _Data) -> @@ -209,7 +204,7 @@ terminate(Reason, State, _Data) -> %%================================================================================ do_receive_outcome(From, #c_outcome{result = Result}, D0 = #d{vote = MyVote, prep = Prep}) -> - {ok, Self} = classy_node:the_site(), + {ok, Self} = classy:the_site(), ?tp(debug, ?classy_vote_part_recv_outcome, #{ outcome => Result , id => Prep#prepare.id @@ -322,7 +317,7 @@ do_prepare( end. send_vote(#d{vote = Vote, prep = Prep = #prepare{id = ID}}) -> - {ok, Self} = classy_node:the_site(), + {ok, Self} = classy:the_site(), #prepare{id = Id, coordinator = Coordinator} = Prep, Arg = #c_vote{ id = Id , vote = Vote @@ -354,7 +349,7 @@ db_establish(Stage, Vote, CompletedActions, Prep) -> [ {w, DataKey, Prep} , {w, StateKey, State} ]), - {ok, Site} = classy_node:the_site(), + {ok, Site} = classy:the_site(), ?tp(debug, ?classy_vote_part_established, #{id => ID, tag => Tag, site => Site}), {ok, #d{ prep = Prep , vote = Vote diff --git a/src/discovery/classy_discovery_etcd.erl b/src/discovery/classy_discovery_etcd.erl index 3df2cd0..9fe15a2 100644 --- a/src/discovery/classy_discovery_etcd.erl +++ b/src/discovery/classy_discovery_etcd.erl @@ -389,9 +389,8 @@ handle_call(_Request, _From, State = #state{}) -> handle_cast(_Request, State = #state{}) -> {noreply, State}. -handle_info({'EXIT', _From, Reason}, State) -> +handle_info({'EXIT', _From, Reason}, State = #state{}) -> {stop, Reason, State}; - handle_info(_Info, State = #state{}) -> {noreply, State}. diff --git a/test/classy_SUITE.erl b/test/classy_SUITE.erl index e1012f0..d0c8e69 100644 --- a/test/classy_SUITE.erl +++ b/test/classy_SUITE.erl @@ -82,13 +82,13 @@ t_020_join(_) -> %% cluster ID should be equal to the site id: ?assertEqual( {ok, Cluster1}, - ?ON(S1, classy_node:the_cluster())), + ?ON(S1, classy:the_cluster())), ?assertEqual( [S1], ?ON(S1, classy:sites())), ?assertEqual( {ok, Cluster2}, - ?ON(S2, classy_node:the_cluster())), + ?ON(S2, classy:the_cluster())), ?assertEqual( [S2], ?ON(S2, classy:sites())), @@ -101,10 +101,10 @@ t_020_join(_) -> %% Verify state after join: ?assertEqual( {ok, Cluster1}, - ?ON(S1, classy_node:the_cluster())), + ?ON(S1, classy:the_cluster())), ?assertEqual( {ok, Cluster1}, - ?ON(S2, classy_node:the_cluster())), + ?ON(S2, classy:the_cluster())), ?assertSameSet( [S1, S2], ?ON(S1, classy:sites())), @@ -302,16 +302,17 @@ t_060_at_lower_level(_) -> begin %% Prepare the system: _N1 = create_start_site(S1, #{}), - timer:sleep(1000), + ct:sleep(1000), ?block_until(#{?snk_kind := classy_change_run_level, to := quorum}), ?assertMatch( - {ok, hello}, + ok, ?ON(S1, classy:at_lower_level( single, fun() -> hello - end))) + end))), + ct:sleep(1000) end, [ {"run level transitions", fun(Trace) -> @@ -326,6 +327,102 @@ t_060_at_lower_level(_) -> , fun events_on_all_sites/1 ]). +%% Verify handling of timeouts during run level changes. +t_061_run_level_timeouts(_) -> + S1 = <<"s1">>, + ?check_trace( + #{timetrap => 30_000}, + begin + %% Setup: + _N1 = create_start_site(S1, #{}), + ?block_until(#{?snk_kind := classy_change_run_level, to := quorum}), + Pred = ?match_event(#{?snk_kind := K} when K =:= rl_change; + K =:= ?classy_hook_failure; + K =:= ?classy_run_level_change_error), + ?ON(S1, + begin + classy:run_level( + fun(From, To) -> + ?tp(rl_change, #{f => From, t => To}), + timer:sleep(100) + end, + 0) + end), + %% Suspend `classy_node' for the duration of the test, so it + %% doesn't interfere: + ok = ?ON(S1, sys:suspend(classy_node, 1000)), + %% 1. First try setting run level multiple times, faster than + %% the hooks can handle: + ?tp(test_stage1, #{}), + ?ON(S1, application:set_env(classy, hook_timeout, 1000)), + {ok, Sub1} = snabbkaffe:subscribe(Pred, 100, 3000, 0), + %% Issue a few conflicting commands in rapid succession: + ?ON(S1, classy_rl_changer:set(?stopped)), + ?ON(S1, classy_rl_changer:set(?quorum)), + ?ON(S1, classy_rl_changer:set(?stopped)), + %% System should follow the last command: + {_, Events1} = snabbkaffe:receive_events(Sub1), + ?assertMatch( + [ #{f := ?quorum, t := ?cluster} + , #{f := ?cluster, t := ?single} + , #{f := ?single, t := ?stopped} + ], + Events1), + %% 2. Same logic applies when the system is stopped: + %% Prepare; go to the single state + ?tp(test_stage2, #{}), + ?ON(S1, + classy_rl_changer:set_sync(?single, 5_000)), + %% Request transition to quorum, and simultaneously stop + %% application (simulated by a supervisor request): + {ok, Sub2} = snabbkaffe:subscribe(Pred, 100, 3000, 0), + ?ON(S1, + begin + classy_rl_changer:set(?quorum), + ok = supervisor:terminate_child(classy_sup, run_level_mgr) + end), + {_, Events2} = snabbkaffe:receive_events(Sub2), + ?assertMatch( + [ #{f := ?single, t := ?cluster} + , #{f := ?cluster, t := ?single} + , #{f := ?single, t := ?stopped} + ], + Events2), + %% 3. Verify that timeouts are handled normally: + ?tp(test_stage3, #{}), + ?ON(S1, + begin + {ok, _} = supervisor:restart_child(classy_sup, run_level_mgr), + application:set_env(classy, hook_timeout, 90) + end), + {ok, Sub3} = snabbkaffe:subscribe(Pred, 100, 3000, 0), + ?ON(S1, + begin + classy_rl_changer:set_sync(?quorum, 5_000), + classy_rl_changer:set_sync(?stopped, 5_000) + end), + {_, Events3} = snabbkaffe:receive_events(Sub3), + ?assertMatch( + [ %% Advance: + #{f := ?stopped, t := ?single} + , #{?snk_kind := ?classy_hook_failure, reason := {error, timeout}} + , #{f := ?single, t := ?cluster} + , #{?snk_kind := ?classy_hook_failure, reason := {error, timeout}} + , #{f := ?cluster, t := ?quorum} + , #{?snk_kind := ?classy_hook_failure, reason := {error, timeout}} + %% Retard: + , #{f := ?quorum, t := ?cluster} + , #{?snk_kind := ?classy_hook_failure, reason := {error, timeout}} + , #{f := ?cluster, t := ?single} + , #{?snk_kind := ?classy_hook_failure, reason := {error, timeout}} + , #{f := ?single, t := ?stopped} + , #{?snk_kind := ?classy_hook_failure, reason := {error, timeout}} + ], + Events3) + end, + [ fun events_on_all_sites/1 + ]). + %% This testcase verifies site autoclean functionality t_070_cleanup(_) -> S1 = <<"s1">>, @@ -334,21 +431,22 @@ t_070_cleanup(_) -> Sites = [S1, S2, S3], AppConf = {familiar_app, #{ app => classy - , env => #{ quorum => 2 - , max_site_downtime => 1 - , forget_after => 0 - , rpc_timeout => 100 + , env => #{ quorum => 2 + , max_site_downtime => 1 + , forget_after => 0 + , rpc_timeout => 100 + , cleanup_check_interval => 100 } }}, Conf = #{fixtures => [AppConf]}, ?check_trace( - #{timetrap => 20_000}, + #{timetrap => 30_000}, begin %% Prepare system: N1 = create_start_site(S1, Conf), _N2 = create_start_site(S2, Conf), _N3 = create_start_site(S3, Conf), - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertMatch(ok, ?ON(S2, classy:join_node(N1, join))), ?assertMatch(ok, ?ON(S3, classy:join_node(N1, join))), wait_site_joined(Sites, Cluster, S2), @@ -362,7 +460,7 @@ t_070_cleanup(_) -> %% Bring up S2 and restore quorum, that should lead to deletion of S3: ?wait_async_action( restart_site(S2), - #{?snk_kind := automatically_kick_down_site}), + #{?snk_kind := classy_member_leave, remote := S3}), wait_site_kicked([S1, S2], Cluster, S3), ?assertSameSet([S1, S2], ?ON(S1, classy:sites())), ?assertSameSet([S1, S2], ?ON(S2, classy:sites())) @@ -492,12 +590,14 @@ t_090_info(_) -> , peers := Peers1_0 , last_update := _ , hello := world + , n_restarts := 1 } = I1_0 = ?ON(S1, classy:info()), #{ cluster := Cluster2_0 , site := S2 , peers := Peers2_0 , last_update := _ , hello := world + , n_restarts := 1 } = I2_0 = ?ON(S2, classy:info()), ?assert(is_binary(Cluster1)), ?assertNotEqual(Cluster1, Cluster2_0), @@ -571,7 +671,8 @@ t_091_node_of_site(_) -> || I <- Sites, J <- Sites, OnlyLive <- [true, false]], %% Shut down S2 and verify that S1 reacted on changes: stop_site(S2), - ?block_until(#{?snk_kind := classy_peer_disconnected, remote := S2}), + ?block_until(#{?snk_kind := classy_peer_disconnected, site := S2}), + ct:sleep(100), ?assertEqual( undefined, ?ON(S1, classy:node_of_site(S2, true))), @@ -638,7 +739,8 @@ t_092_link_detect(_) -> CInfo2), %% Stop one of the sites: stop_site(S2), - ?block_until(#{?snk_kind := classy_peer_disconnected, remote := S2}), + ?block_until(#{?snk_kind := classy_peer_disconnected, site := S2}), + ct:sleep(100), CInfo3 = classy:info([N1, N2]), %% We can still derive that there's no bidirectional link: ?assertEqual( @@ -684,8 +786,25 @@ t_100_autocluster(_) -> [?ON(I, application:set_env(classy, discovery_strategy, Strategy)) || I <- Sites], - %% Wait for the autocluster to do its job: - ?block_until(#{?snk_kind := classy_member_join}) + ?block_until(#{?snk_kind := classy_member_join}), + ct:sleep(1000), + %% Verify candidates function: + {ok, Cluster} = ?ON(S1, classy:the_cluster()), + ?assertEqual( + {ok, [{Cluster, N2}]}, + ?ON(S1, classy_autocluster:candidates())), + ?assertEqual( + {ok, [{Cluster, N1}]}, + ?ON(S2, classy_autocluster:candidates())), + %% Verify that sites formed the cluster: + ?retry(100, 10, + begin + [?assertSameSet( + [S1, S2], + ?ON(I, classy:sites()), + I) + || I <- [S1, S2]] + end) end, [ fun no_unexpected_events/1 , fun events_on_all_sites/1 @@ -700,23 +819,13 @@ t_200_n_restarts(_) -> begin create_start_site(S, #{}), ?assertEqual( - {ok, 0}, - ?ON(S, classy_node:n_restarts())), - %% Verify serial UID tuples: - ?assertEqual( - {0, 1}, - ?ON(S, classy_uid:site_unique_seq_tuple(seq))), - ?assertEqual( - {0, 2}, - ?ON(S, classy_uid:site_unique_seq_tuple(seq))), - ?assertEqual( - {S, 0, 3}, - ?ON(S, classy_uid:cluster_unique_seq_tuple(seq))), + 1, + ?ON(S, classy:n_restarts())), %% Verify regular UID tuples: - {0, UI1} = ?ON(S, classy_uid:site_unique_tuple()), - {0, UI2} = ?ON(S, classy_uid:site_unique_tuple()), - {S, 0, UI3} = ?ON(S, classy_uid:cluster_unique_tuple()), - {S, 0, UI4} = ?ON(S, classy_uid:cluster_unique_tuple()), + {1, UI1} = ?ON(S, classy_uid:site_unique_tuple()), + {1, UI2} = ?ON(S, classy_uid:site_unique_tuple()), + {S, 1, UI3} = ?ON(S, classy_uid:cluster_unique_tuple()), + {S, 1, UI4} = ?ON(S, classy_uid:cluster_unique_tuple()), ?assertEqual( [UI1, UI2, UI3, UI4], lists:uniq([UI1, UI2, UI3, UI4])), @@ -724,24 +833,17 @@ t_200_n_restarts(_) -> stop_site(S), restart_site(S), ?assertEqual( - {ok, Nr}, - ?ON(S, classy_node:n_restarts())), - %% Verify serial UID tuples: - ?assertEqual( - {Nr, 1}, - ?ON(S, classy_uid:site_unique_seq_tuple(seq))), - ?assertEqual( - {S, Nr, 2}, - ?ON(S, classy_uid:cluster_unique_seq_tuple(seq))), - %% Verify regular UID tuples: - ?assertMatch( - {Nr, UI} when is_integer(UI), - ?ON(S, classy_uid:site_unique_tuple())), - ?assertMatch( - {S, Nr, UI} when is_integer(UI), - ?ON(S, classy_uid:cluster_unique_tuple())) + Nr, + ?ON(S, classy:n_restarts())), + %% Verify regular UID tuples: + ?assertMatch( + {Nr, UI} when is_integer(UI), + ?ON(S, classy_uid:site_unique_tuple())), + ?assertMatch( + {S, Nr, UI} when is_integer(UI), + ?ON(S, classy_uid:cluster_unique_tuple())) end - || Nr <- lists:seq(1, 5)] + || Nr <- lists:seq(2, 5)] end, [ fun no_unexpected_events/1 , fun events_on_all_sites/1 @@ -758,7 +860,7 @@ t_300_rpc(_) -> %% Prepare N1 = create_start_site(S1, #{}), N2 = create_start_site(S2, #{}), - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertMatch(ok, ?ON(S2, classy:join_node(N1, join))), wait_site_joined([S1, S2], Cluster, S2), %% Tests: @@ -843,7 +945,7 @@ t_310_rpc_to_failing_node(_) -> %% Prepare N1 = create_start_site(S1, #{}), N2 = create_start_site(S2, #{}), - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertMatch(ok, ?ON(S2, classy:join_node(N1, join))), wait_site_joined([S1, S2], Cluster, S2), %% Test: @@ -880,7 +982,7 @@ t_400_vote_smoke_abort(_) -> N2 = create_start_site(S2, #{}), N3 = create_start_site(S3, #{}), Nodes = [N1, N2, N3], - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertEqual(ok, ?ON(S2, classy:join_node(N1, join))), ?assertEqual(ok, ?ON(S3, classy:join_node(N1, join))), wait_site_joined(Sites, Cluster, S2), @@ -952,7 +1054,7 @@ t_401_vote_timeout(_) -> N2 = create_start_site(S2, #{}), N3 = create_start_site(S3, #{}), Nodes = [N1, N2, N3], - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertEqual(ok, ?ON(S2, classy:join_node(N1, join))), ?assertEqual(ok, ?ON(S3, classy:join_node(N1, join))), wait_site_joined(Sites, Cluster, S2), @@ -1010,7 +1112,7 @@ t_403_vote_coord_restart(_) -> N2 = create_start_site(S2, #{}), N3 = create_start_site(S3, #{}), Nodes = [N1, N2, N3], - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertEqual(ok, ?ON(S2, classy:join_node(N1, join))), ?assertEqual(ok, ?ON(S3, classy:join_node(N1, join))), wait_site_joined(Sites, Cluster, S2), @@ -1070,7 +1172,7 @@ t_404_vote_part_restart(_) -> N2 = create_start_site(S2, #{}), N3 = create_start_site(S3, #{}), Nodes = [N1, N2, N3], - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertEqual(ok, ?ON(S2, classy:join_node(N1, join))), ?assertEqual(ok, ?ON(S3, classy:join_node(N1, join))), wait_site_joined(Sites, Cluster, S2), @@ -1162,7 +1264,7 @@ t_410_vote_commit(_) -> N2 = create_start_site(S2, #{}), N3 = create_start_site(S3, #{}), Nodes = [N1, N2, N3], - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertEqual(ok, ?ON(S2, classy:join_node(N1, join))), ?assertEqual(ok, ?ON(S3, classy:join_node(N1, join))), wait_site_joined(Sites, Cluster, S2), @@ -1211,7 +1313,7 @@ t_411_commit_actions_after_restart(_) -> N1 = create_start_site(S1, #{peer => #{shutdown => halt}}), N2 = create_start_site(S2, #{peer => #{shutdown => halt}}), Nodes = [N1, N2], - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertEqual(ok, ?ON(S2, classy:join_node(N1, join))), wait_site_joined(Sites, Cluster, S2), %% Make sure post commit actions are delayed: @@ -1232,12 +1334,12 @@ t_411_commit_actions_after_restart(_) -> [ok = restart_site(S) || S <- Sites], ?assert(classy_vote:test_wait_conclude(ID)), verify_no_votes(Nodes), - Nodes + {ID, Nodes} end, [ fun no_unexpected_events/1 , fun events_on_all_sites/1 , {"commit events", - fun([_N1, N2], Trace) -> + fun({_ID, [_N1, N2]}, Trace) -> ?assertMatch( [ #{?snk_kind := test_go} , #{ ?snk_kind := classy_test_vote_commit @@ -1256,7 +1358,14 @@ t_411_commit_actions_after_restart(_) -> ?of_kind(classy_test_post_vote, Trace)) end} , {"participant stages", - fun([_N1, _N2], Trace) -> + fun({ID, [_N1, _N2]}, Trace0) -> + Trace = lists:filter( + fun(#{?snk_kind := test_go}) -> true; + (#{?snk_kind := classy_test_vote_commit}) -> true; + (#{?snk_kind := ?classy_vote_part_stage, id := I}) when I =:= ID -> true; + (_) -> false + end, + Trace0), ?assertMatch( [ #{from := 0, to := 0} %% Enter prepare , #{from := 0, to := 1} %% Enter wait outcome @@ -1265,7 +1374,7 @@ t_411_commit_actions_after_restart(_) -> , #{from := 2, to := 2} %% Restored state into commit stage , #{?snk_kind := classy_test_vote_commit} ], - ?of_kind([?classy_vote_part_stage, test_go, classy_test_vote_commit], Trace)) + Trace) end} | classy_vote:trace_props() ]). @@ -1282,7 +1391,7 @@ t_412_commit_action_crash(_) -> N1 = create_start_site(S1, #{}), N2 = create_start_site(S2, #{}), Nodes = [N1, N2], - {ok, Cluster} = ?ON(S1, classy_node:the_cluster()), + {ok, Cluster} = ?ON(S1, classy:the_cluster()), ?assertEqual(ok, ?ON(S2, classy:join_node(N1, join))), wait_site_joined(Sites, Cluster, S2), %% Inject failures into the commit flows: @@ -1311,12 +1420,12 @@ t_412_commit_action_crash(_) -> [ok = restart_site(S) || S <- Sites], ?assert(classy_vote:test_wait_conclude(ID)), verify_no_votes(Nodes), - Nodes + {Nodes, ID} end, [ fun no_unexpected_events/1 , fun events_on_all_sites/1 , {"coordinator history", - fun([N1, _N2], Trace) -> + fun({[N1, _N2], _ID}, Trace) -> ?assertMatch( [ #{from := 0, to := 0} %% Enter vote , #{from := 0, to := 10} %% Enter commit @@ -1334,7 +1443,14 @@ t_412_commit_action_crash(_) -> Trace))) end} , {"participant history", - fun([_N1, N2], Trace) -> + fun({[_N1, N2], ID}, Trace0) -> + Trace = lists:filter( + fun(#{?snk_kind := classy_test_vote_commit}) -> true; + (#{?snk_kind := classy_test_vote_on_fail}) -> true; + (#{?snk_kind := ?classy_vote_part_stage, id := I}) when I =:= ID -> true; + (_) -> false + end, + ?of_node(N2, Trace0)), ?assertMatch( [ #{from := 0, to := 0} %% Enter prepare , #{from := 0, to := 1} %% Enter wait outcome @@ -1346,12 +1462,7 @@ t_412_commit_action_crash(_) -> , #{?snk_kind := classy_test_vote_commit, ref := Ref1, step := 2} , #{?snk_kind := classy_test_vote_commit, ref := Ref1, step := 3} ], - ?of_node(N2, - ?of_kind([ ?classy_vote_part_stage - , classy_test_vote_commit - , classy_test_vote_on_fail - ], - Trace))) + Trace) end} | classy_vote:trace_props() ]). @@ -1476,10 +1587,11 @@ no_unexpected_events(Trace) -> [ ?classy_unknown_event , ?classy_abnormal_exit , ?classy_table_anomaly - , classy_hook_failure + , ?classy_hook_failure , classy_discovery_failure , classy_table_on_update_callback_failure , ?classy_bad_data + , ?classy_run_level_change_error ], Trace)). @@ -1575,10 +1687,10 @@ site_of_event(#{?snk_kind := Kind, local := Site}) when Kind =:= classy_member_leave; Kind =:= classy_joined_cluster; Kind =:= classy_kicked_from_cluster; + Kind =:= classy_change_run_level; Kind =:= classy_init_clustering -> Site; site_of_event(#{?snk_kind := Kind, ?snk_meta := #{local := Site}}) when - Kind =:= classy_change_run_level; Kind =:= classy_peer_up; Kind =:= classy_peer_down -> Site; @@ -1625,6 +1737,7 @@ create_start_site(Cluster, Site, CustomConf) -> , cleanup_check_interval => 100 , vote_retry_interval => 100 , rpc_timeout => 100 + , discovery_interval => 100 } }}, Fixtures = maps:get(fixtures, CustomConf, []), @@ -1676,6 +1789,19 @@ wait_site_joined(WaitOnSites, Cluster, Site) -> %% Account for possible race condition since the hook emitting the event is the first: ct:sleep(10). +sync_kick(ExecOn, Target, Intent, WaitOn) -> + Pred = fun(#{?snk_kind := classy_member_leave, remote := Target, local := Local}) -> + lists:member(Local, WaitOn); + (#{?snk_kind := classy_kicked_from_cluster, local := Target}) -> + true; + (_) -> + false + end, + {ok, Sub} = snabbkaffe:subscribe(Pred, length(WaitOn), infinity, 0), + ?ON(ExecOn, classy:kick_site(Target, Intent)), + {ok, _} = snabbkaffe:receive_events(Sub), + ok. + wait_site_kicked(WaitOnSites, Cluster, Site) -> lists:foreach( fun(Local) -> @@ -1686,7 +1812,13 @@ wait_site_kicked(WaitOnSites, Cluster, Site) -> , remote := Site }) end, - WaitOnSites), + WaitOnSites -- [Site]), + case lists:member(Site, WaitOnSites) of + true -> + ?block_until(#{?snk_kind := classy_kicked_from_cluster, local := Site}); + false -> + ok + end, %% Account for possible race condition since the hook emitting the event is the first: ct:sleep(10). @@ -1755,8 +1887,14 @@ vote_on_fail(FailInfo, Ref) -> ?tp(classy_test_vote_on_fail, FailInfo#{test_ref => Ref}). verify_no_votes(Nodes) -> - Results = erpc:multicall(Nodes, ets, tab2list, [classy_vote_table]), - [?assertMatch({ok, []}, Result, Node) || {Node, Result} <- lists:zip(Nodes, Results)]. + %% TODO: using retry due to sporadic votes spawned by + %% `classy_liveness'. Find a way to filter them out. + ?retry(100, 10, + begin + Results = erpc:multicall(Nodes, ets, tab2list, [classy_vote_table]), + %% Filter out node down votes that may occur sporadically: + [?assertMatch({ok, []}, Result, Node) || {Node, Result} <- lists:zip(Nodes, Results)] + end). -spec proper_printout(string(), list()) -> _. proper_printout(Char, []) when Char =:= "."; diff --git a/test/prop_misc.erl b/test/prop_misc.erl new file mode 100644 index 0000000..9cb463c --- /dev/null +++ b/test/prop_misc.erl @@ -0,0 +1,47 @@ +-module(prop_misc). + +-export([prop_liveness_convert/0, prop_liveness_compare/0]). + +-include_lib("kernel/include/logger.hrl"). +-include_lib("proper/include/proper.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include("classy.hrl"). + +prop_liveness_convert() -> + ?FORALL( + {NRes, Self, IsUp}, + {non_neg_integer(), boolean(), boolean()}, + ?TRAPEXIT( + begin + Liveness = classy_membership:to_liveness(NRes, Self, IsUp), + Result = classy_membership:from_liveness(Liveness), + ?WHENFAIL(io:format("NR=~p, S=~p, U=~p~nLiveness=~p result=~p~n", + [NRes, Self, IsUp, Liveness, Result]), + Result =:= {NRes, Self, IsUp}) + end)). + +prop_liveness_compare() -> + ?FORALL( + {NRes1, NRes2, Self1, Self2, IsUp1, IsUp2}, + {non_neg_integer(), non_neg_integer(), boolean(), boolean(), boolean(), boolean()}, + ?TRAPEXIT( + begin + Liveness1 = classy_membership:to_liveness(NRes1, Self1, IsUp1), + Liveness2 = classy_membership:to_liveness(NRes2, Self2, IsUp2), + ?WHENFAIL(io:format("l1 = ~p l2 = ~p", [Liveness1, Liveness2]), + if NRes1 =:= NRes2, Self1 =:= Self2 -> + congruent(IsUp1, IsUp2, Liveness1, Liveness2); + NRes1 =:= NRes2 -> + %% Note: swapping here is intentional: + congruent(Self2, Self1, Liveness1, Liveness2); + true -> + congruent(NRes1, NRes2, Liveness1, Liveness2) + end) + end)). + +congruent(A1, A2, B1, B2) -> + if A1 > A2 -> B1 > B2; + A1 < A2 -> B1 < B2; + A1 =:= A2 -> B1 =:= B2 + end.