Skip to content
This repository was archived by the owner on Sep 19, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/fabric_db_create.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ generate_shard_map(DbName, Options) ->
% the DB already exists, and may have a different Suffix
ok;
{not_found, _} ->
Doc = make_document(Shards, Suffix)
HashFun = proplists:get_value(hash, Options),
Srs = proplists:get_value(srs, Options),
Size = proplists:get_value(size, Options),
Doc = make_document(Shards, Suffix, HashFun, Srs, Size)
end,
{Shards, Doc}.

Expand Down Expand Up @@ -143,19 +146,22 @@ maybe_stop(W, Counters) ->
end
end.

make_document([#shard{dbname=DbName}|_] = Shards, Suffix) ->
make_document([#shard{dbname=DbName}|_] = Shards, Suffix, HashModule, Srs, Size) ->
{RawOut, ByNodeOut, ByRangeOut} =
lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) ->
Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-",
couch_util:to_hex(<<E:32/integer>>)]),
Range = ?l2b([couch_util:to_hex(<<B:Size/integer>>), "-",
couch_util:to_hex(<<E:Size/integer>>)]),
Node = couch_util:to_binary(N),
{[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode),
orddict:append(Range, Node, ByRange)}
end, {[], [], []}, Shards),
HashFun = {<<"hash_fun">>, <<HashModule/binary,":mem3_hash">>},
#doc{id=DbName, body = {[
{<<"shard_suffix">>, Suffix},
{<<"changelog">>, lists:sort(RawOut)},
{<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
{<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
{<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}},
{<<"hash_info">>, {[HashFun, {<<"ring_top">>, 1 bsl Size}]}},
{<<"srs">>, Srs}
]}}.

3 changes: 3 additions & 0 deletions src/fabric_doc_open.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ choose_reply(Docs) ->

format_reply({ok, #doc{deleted=true}}, true) ->
{not_found, deleted};
format_reply({ok, #doc{}=Doc}, _) ->
couch_doc_security:filter(Doc);

format_reply(Else, _) ->
Else.

Expand Down
16 changes: 13 additions & 3 deletions src/fabric_doc_open_revs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("eunit/include/eunit.hrl").

-record(state, {
dbname,
Expand Down Expand Up @@ -48,8 +48,10 @@ go(DbName, Id, Revs, Options) ->
RexiMon = fabric_util:create_monitors(Workers),
try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of
{ok, {ok, Reply}} ->
{ok, Reply};
Else ->
{ok, format_reply(Reply)};
{ok, Reply} ->
{ok, format_reply(Reply)};
Else ->
Else
after
rexi_monitor:stop(RexiMon)
Expand Down Expand Up @@ -139,6 +141,14 @@ maybe_reply(DbName, ReplyDict, Complete, RepairDocs, R) ->
noreply
end.

format_reply(DocList) ->
MapFun = fun
({ok, #doc{}=Doc}) -> couch_doc_security:filter(Doc);
(Else) -> Else
end,
lists:map(MapFun, DocList).


extract_replies(Replies) ->
lists:map(fun({_,{Reply,_}}) -> Reply end, Replies).

Expand Down
14 changes: 9 additions & 5 deletions src/fabric_doc_update.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,23 @@ go(DbName, AllDocs0, Opts) ->
Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}),
{Shard#shard{ref=Ref}, Docs}
end, group_docs_by_shard(DbName, AllDocs)),
{Workers, _} = lists:unzip(GroupedDocs),

{Workers, ResultDocs} = lists:unzip(GroupedDocs),
NewAllDocs = lists:usort(lists:flatten(ResultDocs)),

RexiMon = fabric_util:create_monitors(Workers),
W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
dict:new()},
Timeout = fabric_util:request_timeout(),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
{ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
{Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]};
{Health, [R || R <- couch_util:reorder_results(NewAllDocs, Results), R =/= noreply]};
{timeout, Acc} ->
{_, _, W1, _, DocReplDict} = Acc,
{Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []},
DocReplDict),
{Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]};
{Health, [R || R <- couch_util:reorder_results(NewAllDocs, Resp), R =/= noreply]};
Else ->
Else
after
Expand Down Expand Up @@ -166,10 +169,11 @@ good_reply(_) ->

-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
group_docs_by_shard(DbName, Docs) ->
dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
dict:to_list(lists:foldl(fun(Doc, D0) ->
Shards = mem3:shards(DbName, Doc),
lists:foldl(fun(Shard, D1) ->
dict:append(Shard, Doc, D1)
end, D0, mem3:shards(DbName,Id))
end, D0, Shards)
end, dict:new(), Docs)).

append_update_replies([], [], DocReplyDict) ->
Expand Down
11 changes: 7 additions & 4 deletions src/fabric_view.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ handle_worker_exit(Collector, _Worker, Reason) ->
-spec is_progress_possible([{#shard{}, term()}]) -> boolean().
is_progress_possible([]) ->
false;
is_progress_possible(Counters) ->
is_progress_possible([{Shard1, _} | _] = Counters) ->
Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
[], Counters),
[{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges),
% Ringtop = mem3_util:ringtop(Shard1#shard.dbname),
Ringtop = mem3_util:ringtop(Shard1#shard.dbname),
Result = lists:foldl(fun
(_, fail) ->
% we've already declared failure
Expand All @@ -66,13 +68,13 @@ is_progress_possible(Counters) ->
fail;
({_,Y}, Tail) ->
case erlang:max(Tail, Y) of
End when (End+1) =:= (2 bsl 31) ->
End when (End+1) =:= Ringtop ->
complete;
Else ->
% the normal condition, adding to the tail
Else
end
end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest),
end, if (Tail0+1) =:= Ringtop -> complete; true -> Tail0 end, Rest),
(Start =:= 0) andalso (Result =:= complete).

-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
Expand Down Expand Up @@ -256,7 +258,8 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
{row, {[{id,Id}, {key,Key}, {value,Value}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
{row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc0}) ->
{ok, Doc} = couch_doc_security:filter(Doc0),
{row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}.


Expand Down
3 changes: 3 additions & 0 deletions src/fabric_view_changes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ changes_row(Props0, IncludeDocs) ->
{true, {error, Reason}} ->
% Transform {doc, {error, Reason}} to {error, Reason} for JSON
lists:keyreplace(doc, 1, Props0, {error, Reason});
{true, Doc0} when Doc0 /= undefined ->
{ok, Doc} = couch_doc_security:filter(Doc0),
lists:keyreplace(doc, 1, Props0, {doc, Doc});
{false, _} ->
lists:keydelete(doc, 1, Props0);
_ ->
Expand Down
2 changes: 1 addition & 1 deletion src/fabric_view_map.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
go(DbName, DDoc, View, Args, Callback, Acc) ->
Shards = fabric_view:get_shards(DbName, Args),
Repls = fabric_view:get_shard_replacements(DbName, Shards),
RPCArgs = [DDoc, View, Args],
RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args],
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc2, map_view, RPCArgs))
end,
Expand Down
2 changes: 1 addition & 1 deletion src/fabric_view_reduce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc) ->
Views = couch_view_group:get_views(Group),
{NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
{VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
RPCArgs = [DDoc, VName, Args],
RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args],
Shards = fabric_view:get_shards(DbName, Args),
Repls = fabric_view:get_shard_replacements(DbName, Shards),
StartFun = fun(Shard) ->
Expand Down