diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl index b83430a..9fa1b6f 100644 --- a/src/fabric_db_create.erl +++ b/src/fabric_db_create.erl @@ -65,7 +65,10 @@ generate_shard_map(DbName, Options) -> % the DB already exists, and may have a different Suffix ok; {not_found, _} -> - Doc = make_document(Shards, Suffix) + HashFun = proplists:get_value(hash, Options), + Srs = proplists:get_value(srs, Options), + Size = proplists:get_value(size, Options), + Doc = make_document(Shards, Suffix, HashFun, Srs, Size) end, {Shards, Doc}. @@ -143,19 +146,22 @@ maybe_stop(W, Counters) -> end end. -make_document([#shard{dbname=DbName}|_] = Shards, Suffix) -> +make_document([#shard{dbname=DbName}|_] = Shards, Suffix, HashModule, Srs, Size) -> {RawOut, ByNodeOut, ByRangeOut} = lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> - Range = ?l2b([couch_util:to_hex(<>), "-", - couch_util:to_hex(<>)]), + Range = ?l2b([couch_util:to_hex(<>), "-", + couch_util:to_hex(<>)]), Node = couch_util:to_binary(N), {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), orddict:append(Range, Node, ByRange)} end, {[], [], []}, Shards), + HashFun = {<<"hash_fun">>, <>}, #doc{id=DbName, body = {[ {<<"shard_suffix">>, Suffix}, {<<"changelog">>, lists:sort(RawOut)}, {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, - {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} + {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}, + {<<"hash_info">>, {[HashFun, {<<"ring_top">>, 1 bsl Size}]}}, + {<<"srs">>, Srs} ]}}. diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl index c9f3f0a..dc992a5 100644 --- a/src/fabric_doc_open.erl +++ b/src/fabric_doc_open.erl @@ -158,6 +158,9 @@ choose_reply(Docs) -> format_reply({ok, #doc{deleted=true}}, true) -> {not_found, deleted}; +format_reply({ok, #doc{}=Doc}, _) -> + couch_doc_security:filter(Doc); + format_reply(Else, _) -> Else. diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl index 7539f51..93731fb 100644 --- a/src/fabric_doc_open_revs.erl +++ b/src/fabric_doc_open_revs.erl @@ -19,7 +19,7 @@ -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). --include_lib("eunit/include/eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). -record(state, { dbname, @@ -48,8 +48,10 @@ go(DbName, Id, Revs, Options) -> RexiMon = fabric_util:create_monitors(Workers), try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of {ok, {ok, Reply}} -> - {ok, Reply}; - Else -> + {ok, format_reply(Reply)}; + {ok, Reply} -> + {ok, format_reply(Reply)}; + Else -> Else after rexi_monitor:stop(RexiMon) @@ -139,6 +141,14 @@ maybe_reply(DbName, ReplyDict, Complete, RepairDocs, R) -> noreply end. +format_reply(DocList) -> + MapFun = fun + ({ok, #doc{}=Doc}) -> couch_doc_security:filter(Doc); + (Else) -> Else + end, + lists:map(MapFun, DocList). + + extract_replies(Replies) -> lists:map(fun({_,{Reply,_}}) -> Reply end, Replies). diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl index 2bbe4d2..fd4d156 100644 --- a/src/fabric_doc_update.erl +++ b/src/fabric_doc_update.erl @@ -31,7 +31,10 @@ go(DbName, AllDocs0, Opts) -> Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}), {Shard#shard{ref=Ref}, Docs} end, group_docs_by_shard(DbName, AllDocs)), - {Workers, _} = lists:unzip(GroupedDocs), + + {Workers, ResultDocs} = lists:unzip(GroupedDocs), + NewAllDocs = lists:usort(lists:flatten(ResultDocs)), + RexiMon = fabric_util:create_monitors(Workers), W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, @@ -39,12 +42,12 @@ go(DbName, AllDocs0, Opts) -> Timeout = fabric_util:request_timeout(), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of {ok, {Health, Results}} when Health =:= ok; Health =:= accepted -> - {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]}; + {Health, [R || R <- couch_util:reorder_results(NewAllDocs, Results), R =/= noreply]}; {timeout, Acc} -> {_, _, W1, _, DocReplDict} = Acc, {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, DocReplDict), - {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]}; + {Health, [R || R <- couch_util:reorder_results(NewAllDocs, Resp), R =/= noreply]}; Else -> Else after @@ -166,10 +169,11 @@ good_reply(_) -> -spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. group_docs_by_shard(DbName, Docs) -> - dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> + dict:to_list(lists:foldl(fun(Doc, D0) -> + Shards = mem3:shards(DbName, Doc), lists:foldl(fun(Shard, D1) -> dict:append(Shard, Doc, D1) - end, D0, mem3:shards(DbName,Id)) + end, D0, Shards) end, dict:new(), Docs)). append_update_replies([], [], DocReplyDict) -> diff --git a/src/fabric_view.erl b/src/fabric_view.erl index 220386d..e35e28f 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -50,10 +50,12 @@ handle_worker_exit(Collector, _Worker, Reason) -> -spec is_progress_possible([{#shard{}, term()}]) -> boolean(). is_progress_possible([]) -> false; -is_progress_possible(Counters) -> +is_progress_possible([{Shard1, _} | _] = Counters) -> Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, [], Counters), [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), + % Ringtop = mem3_util:ringtop(Shard1#shard.dbname), + Ringtop = mem3_util:ringtop(Shard1#shard.dbname), Result = lists:foldl(fun (_, fail) -> % we've already declared failure @@ -66,13 +68,13 @@ is_progress_possible(Counters) -> fail; ({_,Y}, Tail) -> case erlang:max(Tail, Y) of - End when (End+1) =:= (2 bsl 31) -> + End when (End+1) =:= Ringtop -> complete; Else -> % the normal condition, adding to the tail Else end - end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), + end, if (Tail0+1) =:= Ringtop -> complete; true -> Tail0 end, Rest), (Start =:= 0) andalso (Result =:= complete). -spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> @@ -256,7 +258,8 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> {row, {[{id,Id}, {key,Key}, {value,Value}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> +transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc0}) -> + {ok, Doc} = couch_doc_security:filter(Doc0), {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl index 7a375fd..9d88e42 100644 --- a/src/fabric_view_changes.erl +++ b/src/fabric_view_changes.erl @@ -371,6 +371,9 @@ changes_row(Props0, IncludeDocs) -> {true, {error, Reason}} -> % Transform {doc, {error, Reason}} to {error, Reason} for JSON lists:keyreplace(doc, 1, Props0, {error, Reason}); + {true, Doc0} when Doc0 /= undefined -> + {ok, Doc} = couch_doc_security:filter(Doc0), + lists:keyreplace(doc, 1, Props0, {doc, Doc}); {false, _} -> lists:keydelete(doc, 1, Props0); _ -> diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl index 8ed3294..d90299e 100644 --- a/src/fabric_view_map.erl +++ b/src/fabric_view_map.erl @@ -27,7 +27,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> go(DbName, DDoc, View, Args, Callback, Acc) -> Shards = fabric_view:get_shards(DbName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), - RPCArgs = [DDoc, View, Args], + RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args], StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc2, map_view, RPCArgs)) end, diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl index ac25f78..ade84eb 100644 --- a/src/fabric_view_reduce.erl +++ b/src/fabric_view_reduce.erl @@ -30,7 +30,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc) -> Views = couch_view_group:get_views(Group), {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), - RPCArgs = [DDoc, VName, Args], + RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args], Shards = fabric_view:get_shards(DbName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), StartFun = fun(Shard) ->