Skip to content
This repository was archived by the owner on Sep 19, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions src/fabric_quality_factor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
% Copyright 2015 Cloudant
%
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(fabric_quality_factor).

-export([go/1]).
-export([shard_sync_quality_factor/1]).

-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl"). % user_ctx record definition
-define(ADMIN_CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>]}}).

%% This module implements the calculation of a measure of replication/sync
%% quality, called the quality factor. It's an upper bound on the number of
%% missing updates in a system.
%%
%% For (simplified) example, imagine we have shard replicas A, B, and C.
%% - Scenario 1: A, B have all updates, C is missing one. QF: 2
%% - Scenario 2: A has all updates, B, C are missing one each. QF: 2
%% - Scenario 3: A, B, C all up to date. QF: 0
%%
%% One can see the QF is by nature an upper bound. In scenario 1, A and B are
%% "ahead" of C by one update but we do not check if the last update to A and
%% B are identical, and so the upper bound of missing updates is 2 for the system.


%%%%%%%%%%%%
%% Module API
%%%%%%%%%%%%


%% go/1
%% This function will create rexi workers for every shard for the given
%% DbName. handle_message/3 combines per-shard quality factor calculations
%% such that fabric_util:recv should return {ok, DBQualityFactor}, where
%% DBQualityFactor is the resultant quality factor for the database.
go(DbName) ->
Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, shard_sync_quality_factor, []),
RexiMon = fabric_util:create_monitors(Shards),
Fun = fun handle_message/3,
Acc0 = {fabric_dict:init(Workers, nil), []},
try
case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
{ok, DBQualityFactor} -> {ok, DBQualityFactor};
{timeout, {WorkersDict, _}} ->
DefunctWorkers = fabric_util:remove_done_workers(
WorkersDict,
nil
),
fabric_util:log_timeout(
DefunctWorkers,
"shard_sync_quality_factor"
),
{error, timeout};
{error, Error} -> throw(Error)
end
after
rexi_monitor:stop(RexiMon)
end.


%%%%%%%%%%%%%%%%%%%%%
%% rexi callbacks
%%%%%%%%%%%%%%%%%%%%%


%% TODO is this correct when we need a response from all healthy shards?
handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
case fabric_util:remove_down_workers(Counters, NodeRef) of
{ok, NewCounters} ->
{ok, {NewCounters, Acc}};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;

%% TODO is this correct when we need a response from all healthy shards?
handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
NewCounters = fabric_dict:erase(Shard, Counters),
case fabric_view:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
false ->
{error, Reason}
end;

%% handle_message/3
%%
%% We need this callback/response coordinator to listen for a response from
%% all healthy shards -- return the stop tuple only when all are accounted for.
%% Inputs:
%% - {ok, ShardQFactor}: ShardQFactor is the integer per-shard quality
%% factor as determined for shard Shard.
%% - Shard: The open (shard) database. #shard.ref is used as the key for
%% the workers dict.
%% - {Counters, Acc}: Counters is the fabric_dict tracking per-shard
%% response state. Acc is our total quality factor so far.
handle_message({ok, ShardQFactor}, Shard, {Counters, Acc}) ->
%% Store the per-shard quality factor in the workers fabric_dict
%% although we won't strictly need it there for computation.
C1 = fabric_dict:store(Shard, ShardQFactor, Counters),
case fabric_dict:any(nil, C1) of
true ->
{ok, {C1, Acc+ShardQFactor}};
false ->
{stop, Acc+ShardQFactor}
end;
handle_message(_, _, Acc) ->
{ok, Acc}.


%%%%%%%%%%%%%%%%%%%%%%
%% fabric_rpc callback
%%%%%%%%%%%%%%%%%%%%%%


%% Calculate the shard sync quality factor
%% The quality factor is an upper bound on the number of missing updates
%% between shard copies for all copies of a shard.
%% See: https://cloudant.fogbugz.com/f/cases/25615/Create-quality-factor-measurement-for-sequence#BugEvent.203899
%% ShardName - e.g. <<"shards/c0000000-ffffffff/ksnavely/motohacker.1368999314">>
%%
%% Called by fabric_rpc:shard_sync_quality_factor
shard_sync_quality_factor(ShardName) ->
DbName = mem3:dbname(ShardName),
Nodes = nodes_for_shard(ShardName, DbName),

%% For each TargetNode with a ShardName copy, find the upper bound of
%% updates to other shard replicas which are missing from TargetNode
MissingUpdates = [shard_sync_quality_factor(ShardName, TargetNode) ||
TargetNode <- Nodes],

%% The quality factor is the sum of possible missing updates
{ok, lists:sum(MissingUpdates)}.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I think I see what you're doing here, and if I'm reading it right you're double-counting. This function should only care about a specific copy of a specific shard; you don't need to do the multiple passes through nodes_for_shard (one here and one in shard_sync_quality_factor/2). You should only compute the QF for this particular #shard{node = node(), name = ShardName}



%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Internal functions for QF determination
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%


%% Calculate the shard sync quality factor for a single node/shard copy. This
%% is the number of updates to other shard copies not present on TargetNode
%% ShardName - e.g. <<"shards/c0000000-ffffffff/ksnavely/motohacker.1368999314">>
%% TargetNode - e.g. 'dbcore@db11.julep.cloudant.net'
shard_sync_quality_factor(not_found, _TargetNode) ->
not_found;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clause is OK, but I don't think fabric will let you get this far. You used mem3:shards and submit_jobs up above so you'll only have valid shard names.

shard_sync_quality_factor(ShardName, TargetNode) ->
DbName = mem3:dbname(ShardName),
Nodes = nodes_for_shard(ShardName, DbName),

%% Find the upper bound of updates to other shard copies which are missing
%% from TargetNode
MissingUpdates = [shard_missing_updates(Source, TargetNode, ShardName) ||
Source <- Nodes, Source =/= TargetNode],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, here you're correctly filtering out node() as I described in my earlier deleted comment. This function is really the one you want to be calling as the RPC, using node() instead of any possible TargetNode.


%% The quality factor is the sum of possible missing updates
lists:sum(MissingUpdates).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually get fancier than summing the entries here and achieve a more precise bound, but this is a reasonable first pass.


%% Fetch a single checkpoint doc given target, source, and shard name
%% ShardName - e.g. <<"shards/c0000000-ffffffff/ksnavely/motohacker.1368999314">>
%% TargetNode - e.g. 'dbcore@db14.julep.cloudant.net'
%% SourceNode - e.g. 'dbcore@db14.julep.cloudant.net'
local_checkpoint(TargetNode, SourceNode, ShardName) ->
{ok, Db} = rpc:call(SourceNode, couch_db, open, [ShardName, [?ADMIN_CTX]]),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These roc:call invocations are a code smell -- ideally you want to implement the function so that the RPC job on each shard copy grabs all the info that's needed and communicates it back to the coordinator.

UUID = couch_db:get_uuid(Db),
{_LocalId, Doc} = mem3_rpc:load_checkpoint(TargetNode, ShardName, SourceNode, UUID),
Doc.

%% Find upper bound of updates to ShardName on SourceNode that are missing
%% from TargetNode, which also holds a copy of ShardName
%% SourceNode - e.g. 'dbcore@db9.julep.cloudant.net'
%% TargetNode - e.g. 'dbcore@db11.julep.cloudant.net'
%% ShardName - e.g. <<"shards/c0000000-ffffffff/ksnavely/motohacker.1368999314">>
shard_missing_updates(SourceNode, TargetNode, ShardName) ->
%% The number of missing updates is the difference in sequence numbers.
shard_seq(SourceNode, ShardName) - shard_checkpoint_seq(SourceNode, TargetNode, ShardName).

%% Fetch the last replicated source seq from the replication checkpoint doc.
%% SourceNode - e.g. 'dbcore@db9.julep.cloudant.net'
%% TargetNode - e.g. 'dbcore@db14.julep.cloudant.net'
%% ShardName - e.g. <<"shards/c0000000-ffffffff/ksnavely/motohacker.1368999314">>
shard_checkpoint_seq(SourceNode, TargetNode, ShardName) ->
CheckpointDoc = local_checkpoint(TargetNode, SourceNode, ShardName),
{Body} = element(4, CheckpointDoc),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Referring to element positions directly can make for some brittle code. I'd suggest using the record syntax CheckpointDoc#doc.body instead.

Now, that can also be brittle in a clustered system with online upgrades, so we've been moving to implement accessors for records that are shared across modules. Over time we'd want to see something like {Body} = couch_doc:get_body(CheckpointDoc) here.

couch_util:get_value(<<"seq">>, Body).

%% Get the seq of a shard replica from a specific node
%% Node - e.g. 'dbcore@db14.julep.cloudant.net'
%% ShardName - e.g. <<"shards/c0000000-ffffffff/ksnavely/motohacker.1368999314">>
shard_seq(Node, ShardName) ->
{ok, Db} = rpc:call(Node, couch_db, open, [ShardName, [?ADMIN_CTX]]),
couch_db:get_update_seq(Db).

nodes_for_shard(Shard, DbName) ->
lists:map(fun(S1) -> element(3, S1) end,
lists:filter(fun(S) ->
case element(2, S) of
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about records and accessors here -- S1#shard.node and S#shard.name would be preferable.

Shard -> true;
_ -> false
end
end, mem3:shards(DbName))).
5 changes: 5 additions & 0 deletions src/fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ open_shard(Name, Opts) ->
rexi:reply(Error)
end.

shard_sync_quality_factor(ShardName) ->
rexi:reply(
fabric_quality_factor:shard_sync_quality_factor(ShardName)
).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit unconventional to call back into the coordinating module but that's OK.

%%
%% internal
%%
Expand Down