From 62d8e22c76b7d80c7f74b3384583f9c156643d41 Mon Sep 17 00:00:00 2001 From: Norman Barker Date: Sun, 24 Nov 2013 17:17:31 -0700 Subject: [PATCH 1/4] merged hash code --- src/fabric_db_create.erl | 14 +++++++++----- src/fabric_doc_update.erl | 14 +++++++++----- src/fabric_view.erl | 7 ++++--- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl index b83430a..9d7561c 100644 --- a/src/fabric_db_create.erl +++ b/src/fabric_db_create.erl @@ -65,7 +65,9 @@ generate_shard_map(DbName, Options) -> % the DB already exists, and may have a different Suffix ok; {not_found, _} -> - Doc = make_document(Shards, Suffix) + HashFun = proplists:get_value(hash, Options), + Size = proplists:get_value(size, Options), + Doc = make_document(Shards, Suffix, HashFun, Size) end, {Shards, Doc}. @@ -143,19 +145,21 @@ maybe_stop(W, Counters) -> end end. -make_document([#shard{dbname=DbName}|_] = Shards, Suffix) -> +make_document([#shard{dbname=DbName}|_] = Shards, Suffix, HashModule, Size) -> {RawOut, ByNodeOut, ByRangeOut} = lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> - Range = ?l2b([couch_util:to_hex(<>), "-", - couch_util:to_hex(<>)]), + Range = ?l2b([couch_util:to_hex(<>), "-", + couch_util:to_hex(<>)]), Node = couch_util:to_binary(N), {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), orddict:append(Range, Node, ByRange)} end, {[], [], []}, Shards), + HashFun = {<<"hash_fun">>, <>}, #doc{id=DbName, body = {[ {<<"shard_suffix">>, Suffix}, {<<"changelog">>, lists:sort(RawOut)}, {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, - {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} + {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}, + {<<"hash_info">>, {[HashFun, {<<"ring_top">>, 1 bsl Size}]}} ]}}. diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl index 2bbe4d2..ddbd19c 100644 --- a/src/fabric_doc_update.erl +++ b/src/fabric_doc_update.erl @@ -31,7 +31,10 @@ go(DbName, AllDocs0, Opts) -> Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}), {Shard#shard{ref=Ref}, Docs} end, group_docs_by_shard(DbName, AllDocs)), - {Workers, _} = lists:unzip(GroupedDocs), + + {Workers, ResultDocs} = lists:unzip(GroupedDocs), + NewAllDocs = lists:usort(lists:flatten(ResultDocs)), + RexiMon = fabric_util:create_monitors(Workers), W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, @@ -39,12 +42,12 @@ go(DbName, AllDocs0, Opts) -> Timeout = fabric_util:request_timeout(), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of {ok, {Health, Results}} when Health =:= ok; Health =:= accepted -> - {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]}; + {Health, [R || R <- couch_util:reorder_results(NewAllDocs, Results), R =/= noreply]}; {timeout, Acc} -> {_, _, W1, _, DocReplDict} = Acc, {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, DocReplDict), - {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]}; + {Health, [R || R <- couch_util:reorder_results(NewAllDocs, Resp), R =/= noreply]}; Else -> Else after @@ -166,10 +169,11 @@ good_reply(_) -> -spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. group_docs_by_shard(DbName, Docs) -> - dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> + dict:to_list(lists:foldl(fun(Doc, D0) -> + {Shards, {_Type, HashId}} = mem3:shards(DbName, Doc), lists:foldl(fun(Shard, D1) -> dict:append(Shard, Doc, D1) - end, D0, mem3:shards(DbName,Id)) + end, D0, Shards) end, dict:new(), Docs)). append_update_replies([], [], DocReplyDict) -> diff --git a/src/fabric_view.erl b/src/fabric_view.erl index 220386d..889f138 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -50,10 +50,11 @@ handle_worker_exit(Collector, _Worker, Reason) -> -spec is_progress_possible([{#shard{}, term()}]) -> boolean(). is_progress_possible([]) -> false; -is_progress_possible(Counters) -> +is_progress_possible([{Shard1, _} | _] = Counters) -> Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, [], Counters), [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), + Ringtop = mem3_util:ringtop(Shard1#shard.dbname), Result = lists:foldl(fun (_, fail) -> % we've already declared failure @@ -66,13 +67,13 @@ is_progress_possible(Counters) -> fail; ({_,Y}, Tail) -> case erlang:max(Tail, Y) of - End when (End+1) =:= (2 bsl 31) -> + End when (End+1) =:= Ringtop -> complete; Else -> % the normal condition, adding to the tail Else end - end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), + end, if (Tail0+1) =:= Ringtop -> complete; true -> Tail0 end, Rest), (Start =:= 0) andalso (Result =:= complete). -spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> From 67e25e6e63b32b562b1c0f5250f26ffa6a3854f9 Mon Sep 17 00:00:00 2001 From: Norman Barker Date: Sun, 24 Nov 2013 23:45:46 -0700 Subject: [PATCH 2/4] fixed hash merge --- src/fabric_doc_update.erl | 2 +- src/fabric_view.erl | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl index ddbd19c..fd4d156 100644 --- a/src/fabric_doc_update.erl +++ b/src/fabric_doc_update.erl @@ -170,7 +170,7 @@ good_reply(_) -> -spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. group_docs_by_shard(DbName, Docs) -> dict:to_list(lists:foldl(fun(Doc, D0) -> - {Shards, {_Type, HashId}} = mem3:shards(DbName, Doc), + Shards = mem3:shards(DbName, Doc), lists:foldl(fun(Shard, D1) -> dict:append(Shard, Doc, D1) end, D0, Shards) diff --git a/src/fabric_view.erl b/src/fabric_view.erl index 889f138..f671384 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -54,7 +54,8 @@ is_progress_possible([{Shard1, _} | _] = Counters) -> Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, [], Counters), [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), - Ringtop = mem3_util:ringtop(Shard1#shard.dbname), + % Ringtop = mem3_util:ringtop(Shard1#shard.dbname), + Ringtop = mem3_util:ringtop(Shard1#shard.dbname), Result = lists:foldl(fun (_, fail) -> % we've already declared failure From 7a807b7af532b627ea411352319703e0d69ea018 Mon Sep 17 00:00:00 2001 From: Norman Barker Date: Mon, 25 Nov 2013 00:52:32 -0700 Subject: [PATCH 3/4] merged doc-security into spatial-index --- src/fabric_doc_open.erl | 3 +++ src/fabric_doc_open_revs.erl | 16 +++++++++++++--- src/fabric_view.erl | 3 ++- src/fabric_view_changes.erl | 3 +++ src/fabric_view_map.erl | 2 +- src/fabric_view_reduce.erl | 2 +- 6 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl index c9f3f0a..dc992a5 100644 --- a/src/fabric_doc_open.erl +++ b/src/fabric_doc_open.erl @@ -158,6 +158,9 @@ choose_reply(Docs) -> format_reply({ok, #doc{deleted=true}}, true) -> {not_found, deleted}; +format_reply({ok, #doc{}=Doc}, _) -> + couch_doc_security:filter(Doc); + format_reply(Else, _) -> Else. diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl index 7539f51..93731fb 100644 --- a/src/fabric_doc_open_revs.erl +++ b/src/fabric_doc_open_revs.erl @@ -19,7 +19,7 @@ -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). --include_lib("eunit/include/eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). -record(state, { dbname, @@ -48,8 +48,10 @@ go(DbName, Id, Revs, Options) -> RexiMon = fabric_util:create_monitors(Workers), try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of {ok, {ok, Reply}} -> - {ok, Reply}; - Else -> + {ok, format_reply(Reply)}; + {ok, Reply} -> + {ok, format_reply(Reply)}; + Else -> Else after rexi_monitor:stop(RexiMon) @@ -139,6 +141,14 @@ maybe_reply(DbName, ReplyDict, Complete, RepairDocs, R) -> noreply end. +format_reply(DocList) -> + MapFun = fun + ({ok, #doc{}=Doc}) -> couch_doc_security:filter(Doc); + (Else) -> Else + end, + lists:map(MapFun, DocList). + + extract_replies(Replies) -> lists:map(fun({_,{Reply,_}}) -> Reply end, Replies). diff --git a/src/fabric_view.erl b/src/fabric_view.erl index f671384..e35e28f 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -258,7 +258,8 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> {row, {[{id,Id}, {key,Key}, {value,Value}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> +transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc0}) -> + {ok, Doc} = couch_doc_security:filter(Doc0), {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl index 7a375fd..9d88e42 100644 --- a/src/fabric_view_changes.erl +++ b/src/fabric_view_changes.erl @@ -371,6 +371,9 @@ changes_row(Props0, IncludeDocs) -> {true, {error, Reason}} -> % Transform {doc, {error, Reason}} to {error, Reason} for JSON lists:keyreplace(doc, 1, Props0, {error, Reason}); + {true, Doc0} when Doc0 /= undefined -> + {ok, Doc} = couch_doc_security:filter(Doc0), + lists:keyreplace(doc, 1, Props0, {doc, Doc}); {false, _} -> lists:keydelete(doc, 1, Props0); _ -> diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl index 8ed3294..d90299e 100644 --- a/src/fabric_view_map.erl +++ b/src/fabric_view_map.erl @@ -27,7 +27,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> go(DbName, DDoc, View, Args, Callback, Acc) -> Shards = fabric_view:get_shards(DbName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), - RPCArgs = [DDoc, View, Args], + RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args], StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc2, map_view, RPCArgs)) end, diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl index ac25f78..ade84eb 100644 --- a/src/fabric_view_reduce.erl +++ b/src/fabric_view_reduce.erl @@ -30,7 +30,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc) -> Views = couch_view_group:get_views(Group), {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), - RPCArgs = [DDoc, VName, Args], + RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args], Shards = fabric_view:get_shards(DbName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), StartFun = fun(Shard) -> From ab6057c732afe60c110cb161306174f441d65516 Mon Sep 17 00:00:00 2001 From: Norman Barker Date: Wed, 18 Dec 2013 05:01:03 -0700 Subject: [PATCH 4/4] parsed srs for db creation --- src/fabric_db_create.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl index 9d7561c..9fa1b6f 100644 --- a/src/fabric_db_create.erl +++ b/src/fabric_db_create.erl @@ -66,8 +66,9 @@ generate_shard_map(DbName, Options) -> ok; {not_found, _} -> HashFun = proplists:get_value(hash, Options), + Srs = proplists:get_value(srs, Options), Size = proplists:get_value(size, Options), - Doc = make_document(Shards, Suffix, HashFun, Size) + Doc = make_document(Shards, Suffix, HashFun, Srs, Size) end, {Shards, Doc}. @@ -145,7 +146,7 @@ maybe_stop(W, Counters) -> end end. -make_document([#shard{dbname=DbName}|_] = Shards, Suffix, HashModule, Size) -> +make_document([#shard{dbname=DbName}|_] = Shards, Suffix, HashModule, Srs, Size) -> {RawOut, ByNodeOut, ByRangeOut} = lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> Range = ?l2b([couch_util:to_hex(<>), "-", @@ -160,6 +161,7 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, HashModule, Size) -> {<<"changelog">>, lists:sort(RawOut)}, {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}, - {<<"hash_info">>, {[HashFun, {<<"ring_top">>, 1 bsl Size}]}} + {<<"hash_info">>, {[HashFun, {<<"ring_top">>, 1 bsl Size}]}}, + {<<"srs">>, Srs} ]}}.