diff --git a/include/fabric.hrl b/include/fabric.hrl index c8de0f8..e56ff71 100644 --- a/include/fabric.hrl +++ b/include/fabric.hrl @@ -31,7 +31,9 @@ reducer, lang, sorted, - user_acc + user_acc, + seqs = [], + seqs_sent = false }). -record(view_row, {key, id, value, doc, worker}). diff --git a/src/fabric.erl b/src/fabric.erl index fe6bf6c..219a729 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -32,6 +32,8 @@ % Views -export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6, get_view_group_info/2]). +-export([all_docs/5]). +-export([query_view/7]). % miscellany -export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0, @@ -192,18 +194,18 @@ get_missing_revs(DbName, IdsRevs, Options) when is_list(IdsRevs) -> {ok, any()} | any(). update_doc(DbName, Doc, Options) -> case update_docs(DbName, [Doc], opts(Options)) of - {ok, [{ok, NewRev}]} -> - {ok, NewRev}; - {accepted, [{accepted, NewRev}]} -> - {accepted, NewRev}; - {ok, [{{_Id, _Rev}, Error}]} -> + {ok, [{ok, NewRev}], EncShards} -> + {ok, NewRev, EncShards}; + {accepted, [{accepted, NewRev}], EncShards} -> + {accepted, NewRev, EncShards}; + {ok, [{{_Id, _Rev}, Error}], _} -> throw(Error); - {ok, [Error]} -> + {ok, [Error], _} -> throw(Error); - {ok, []} -> + {ok, [], EncShards} -> % replication success #doc{revs = {Pos, [RevId | _]}} = doc(Doc), - {ok, {Pos, RevId}} + {ok, {Pos, RevId}, EncShards} end. %% @doc update a list of docs @@ -212,10 +214,10 @@ update_doc(DbName, Doc, Options) -> update_docs(DbName, Docs, Options) -> try fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) of - {ok, Results} -> - {ok, Results}; - {accepted, Results} -> - {accepted, Results}; + {ok, Results, EncShards} -> + {ok, Results, EncShards}; + {accepted, Results, EncShards} -> + {accepted, Results, EncShards}; Error -> throw(Error) catch {aborted, PreCommitFailures} -> @@ -234,21 +236,24 @@ purge_docs(_DbName, _IdsRevs) -> att_receiver(Req, Length) -> fabric_doc_attachments:receiver(Req, Length). +all_docs(DbName, Callback, Acc0, QueryArgs) -> + all_docs(DbName, Callback, Acc0, QueryArgs, []). + %% @doc retrieves all docs. Additional query parameters, such as `limit', %% `start_key' and `end_key', `descending', and `include_docs', can %% also be passed to further constrain the query. See %% all_docs for details --spec all_docs(dbname(), callback(), [] | tuple(), #view_query_args{}) -> +-spec all_docs(dbname(), callback(), [] | tuple(), #view_query_args{}, []) -> {ok, [any()]}. -all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when +all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs, EncShards) when is_function(Callback, 2) -> - fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0); + fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0, EncShards); %% @doc convenience function that takes a keylist rather than a record %% @equiv all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)) -all_docs(DbName, Callback, Acc0, QueryArgs) -> - all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)). +all_docs(DbName, Callback, Acc0, QueryArgs, EncShards) -> + all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs), EncShards). -spec changes(dbname(), callback(), any(), #changes_args{} | [{atom(),any()}]) -> @@ -270,16 +275,19 @@ query_view(DbName, DesignName, ViewName) -> %% ViewName, fun default_callback/2, [], QueryArgs) query_view(DbName, DesignName, ViewName, QueryArgs) -> Callback = fun default_callback/2, - query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs). + query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs, ""). + +query_view(DbName, DesignName, ViewName, Callback, Acc0, QueryArgs) -> + query_view(DbName, DesignName, ViewName, Callback, Acc0, QueryArgs, []). %% @doc execute a given view. %% There are many additional query args that can be passed to a view, %% see %% query args for details. -spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(), - #view_query_args{}) -> + #view_query_args{}, any()) -> any(). -query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> +query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs, EncShards) -> Db = dbname(DbName), View = name(ViewName), case is_reduce_view(Db, Design, View, QueryArgs) of true -> @@ -287,7 +295,7 @@ query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> false -> Mod = fabric_view_map end, - Mod:go(Db, Design, View, QueryArgs, Callback, Acc0). + Mod:go(Db, Design, View, QueryArgs, Callback, Acc0, EncShards). %% @doc retrieve info about a view group, disk size, language, whether compaction %% is running and so forth @@ -328,7 +336,7 @@ design_docs(DbName) -> ({error, Reason}, _Acc) -> {error, Reason} end, - fabric:all_docs(dbname(DbName), Callback, [], QueryArgs). + fabric:all_docs(dbname(DbName), Callback, [], QueryArgs, ""). %% @doc forces a reload of validation functions, this is performed after %% design docs are update diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl index dd6f2ca..2cd8291 100644 --- a/src/fabric_doc_update.erl +++ b/src/fabric_doc_update.erl @@ -26,23 +26,23 @@ go(DbName, AllDocs, Opts) -> validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), Options = lists:delete(all_or_nothing, Opts), GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), + Ref = rexi:cast(Node, {fabric_rpc, update_docs_seq, [Name, Docs, Options]}), {Shard#shard{ref=Ref}, Docs} end, group_docs_by_shard(DbName, AllDocs)), {Workers, _} = lists:unzip(GroupedDocs), RexiMon = fabric_util:create_monitors(Workers), W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, - dict:from_list([{Doc,[]} || Doc <- AllDocs])}, + dict:from_list([{Doc,[]} || Doc <- AllDocs]), []}, Timeout = fabric_util:request_timeout(), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of - {ok, {Health, Results}} when Health =:= ok; Health =:= accepted -> - {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]}; + try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of + {ok, {Health, Results, Responders}} when Health =:= ok; Health =:= accepted -> + {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply], Responders}; {timeout, Acc} -> - {_, _, W1, _, DocReplDict} = Acc, + {_, _, W1, _, DocReplDict, Responders} = Acc, {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, DocReplDict), - {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]}; + {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply], Responders}; Else -> Else after @@ -50,44 +50,49 @@ go(DbName, AllDocs, Opts) -> end. handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) -> - {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0, + {_, LenDocs, W, GroupedDocs, DocReplyDict, Responders} = Acc0, NewGrpDocs = [X || {#shard{node=N}, _} = X <- GroupedDocs, N =/= NodeRef], - skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict}); + skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict, Responders}); handle_message({rexi_EXIT, _}, Worker, Acc0) -> - {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0, + {WC,LenDocs,W,GrpDocs,DocReplyDict, Responders} = Acc0, NewGrpDocs = lists:keydelete(Worker,1,GrpDocs), - skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict}); + skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict, Responders}); handle_message(internal_server_error, Worker, Acc0) -> % happens when we fail to load validation functions in an RPC worker - {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0, + {WC,LenDocs,W,GrpDocs,DocReplyDict, Responders} = Acc0, NewGrpDocs = lists:keydelete(Worker,1,GrpDocs), - skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict}); + skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict, Responders}); handle_message(attachment_chunk_received, _Worker, Acc0) -> {ok, Acc0}; -handle_message({ok, Replies}, Worker, Acc0) -> - {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, +handle_message({{ok, Replies},DbSeq}, Worker, Acc0) -> + {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0, Responders} = Acc0, + NewResponders = + case lists:member({Worker,DbSeq}, Responders) of + true -> + Responders; + false -> + [{Worker,DbSeq} | Responders] + end, {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), - case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> + if WaitingCount =:= 1 -> % last message has arrived, we need to conclude things {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, - DocReplyDict), - {stop, {Health, Reply}}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look + DocReplyDict), + {stop, {Health, Reply, fabric_util:pack(NewResponders)}}; + true -> case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of continue -> - {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}; + {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict, NewResponders}}; {stop, W, FinalReplies} -> - {stop, {ok, FinalReplies}} + {stop, {ok, FinalReplies, fabric_util:pack(NewResponders)}} end end; handle_message({missing_stub, Stub}, _, _) -> throw({missing_stub, Stub}); handle_message({not_found, no_db_file} = X, Worker, Acc0) -> - {_, _, _, GroupedDocs, _} = Acc0, + {_, _, _, GroupedDocs, _, _} = Acc0, Docs = couch_util:get_value(Worker, GroupedDocs), handle_message({ok, [X || _D <- Docs]}, Worker, Acc0). @@ -160,9 +165,9 @@ append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> % TODO what if the same document shows up twice in one update_docs call? append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). -skip_message({0, _, W, _, DocReplyDict}) -> +skip_message({0, _, W, _, DocReplyDict, Responders}) -> {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict), - {stop, {Health, Reply}}; + {stop, {Health, Reply, Responders}}; skip_message(Acc0) -> {ok, Acc0}. @@ -195,30 +200,29 @@ doc_update1_test() -> % test for W = 2 AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, - Dict}, + Dict,[]}, - {ok,{WaitingCountW2_1,_,_,_,_}=AccW2_1} = - handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW2), + {ok,{WaitingCountW2_1,_,_,_,_,_}=AccW2_1} = + handle_message({{ok, [{ok, Doc1}]}, 0},hd(Shards),AccW2), ?assertEqual(WaitingCountW2_1,2), - {stop, FinalReplyW2 } = - handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW2_1), - ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW2), + + {stop, {ok, [{Doc1, {ok,Doc1}}], _Responders}} = + handle_message({{ok, [{ok, Doc1}]}, 0},lists:nth(2,Shards),AccW2_1), % test for W = 3 AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs, - Dict}, + Dict,[]}, - {ok,{WaitingCountW3_1,_,_,_,_}=AccW3_1} = - handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW3), + {ok,{WaitingCountW3_1,_,_,_,_,_}=AccW3_1} = + handle_message({{ok, [{ok, Doc1}]}, 0},hd(Shards),AccW3), ?assertEqual(WaitingCountW3_1,2), - {ok,{WaitingCountW3_2,_,_,_,_}=AccW3_2} = - handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW3_1), + {ok,{WaitingCountW3_2,_,_,_,_,_}=AccW3_2} = + handle_message({{ok, [{ok, Doc1}]}, 0},lists:nth(2,Shards),AccW3_1), ?assertEqual(WaitingCountW3_2,1), - {stop, FinalReplyW3 } = - handle_message({ok, [{ok, Doc1}]},lists:nth(3,Shards),AccW3_2), - ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW3), + {stop, {ok, [{Doc1, {ok,Doc1}}], _Responders1}} = + handle_message({{ok, [{ok, Doc1}]}, 0},lists:nth(3,Shards),AccW3_2), % test w quorum > # shards, which should fail immediately @@ -226,9 +230,9 @@ doc_update1_test() -> GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>,Shards2,Docs), AccW4 = - {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict}, + {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict, []}, Bool = - case handle_message({ok, [{ok, Doc1}]},hd(Shards2),AccW4) of + case handle_message({{ok, [{ok, Doc1}]}, 0},hd(Shards2),AccW4) of {stop, _Reply} -> true; _ -> false @@ -241,16 +245,12 @@ doc_update1_test() -> SA2 = #shard{node=a, range=2}, SB2 = #shard{node=b, range=2}, GroupedDocs3 = [{SA1,[Doc1]}, {SB1,[Doc1]}, {SA2,[Doc2]}, {SB2,[Doc2]}], - StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2}, - {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0), + StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2, []}, + {ok, StW5_1} = handle_message({{ok, [{ok, "A"}]}, 0}, SA1, StW5_0), {ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1), {ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2), - {stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3), - ?assertEqual( - {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}]}, - ReplyW5 - ). - + {stop, {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}], _Responders2}} = + handle_message({rexi_EXIT, nil}, SB2, StW5_3). doc_update2_test() -> Doc1 = #doc{revs = {1,[<<"foo">>]}}, @@ -258,23 +258,20 @@ doc_update2_test() -> Docs = [Doc2, Doc1], Shards = mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), + GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, - dict:from_list([{Doc,[]} || Doc <- Docs])}, + dict:from_list([{Doc,[]} || Doc <- Docs]), []}, - {ok,{WaitingCount1,_,_,_,_}=Acc1} = - handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0), + {ok,{WaitingCount1,_,_,_,_,_}=Acc1} = + handle_message({{ok, [{ok, Doc1},{ok, Doc2}]}, 0},hd(Shards),Acc0), ?assertEqual(WaitingCount1,2), - {ok,{WaitingCount2,_,_,_,_}=Acc2} = + {ok,{WaitingCount2,_,_,_,_,_}=Acc2} = handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1), ?assertEqual(WaitingCount2,1), - {stop, Reply} = - handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2), - - ?assertEqual({accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}]}, - Reply). + {stop, {accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}], _Responders}} = + handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2). doc_update3_test() -> Doc1 = #doc{revs = {1,[<<"foo">>]}}, @@ -284,20 +281,18 @@ doc_update3_test() -> mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, - dict:from_list([{Doc,[]} || Doc <- Docs])}, + dict:from_list([{Doc,[]} || Doc <- Docs]),[]}, - {ok,{WaitingCount1,_,_,_,_}=Acc1} = - handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0), + {ok,{WaitingCount1,_,_,_,_,_}=Acc1} = + handle_message({{ok, [{ok, Doc1},{ok, Doc2}]}, 0},hd(Shards),Acc0), ?assertEqual(WaitingCount1,2), - {ok,{WaitingCount2,_,_,_,_}=Acc2} = + {ok,{WaitingCount2,_,_,_,_,_}=Acc2} = handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1), ?assertEqual(WaitingCount2,1), - {stop, Reply} = - handle_message({ok, [{ok, Doc1},{ok, Doc2}]},lists:nth(3,Shards),Acc2), - - ?assertEqual({ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}]},Reply). + {stop, {ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}], _Responders}} = + handle_message({{ok, [{ok, Doc1},{ok, Doc2}]}, 0},lists:nth(3,Shards),Acc2). % needed for testing to avoid having to start the mem3 application group_docs_by_shard_hack(_DbName, Shards, Docs) -> diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 2a79583..440b144 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -16,7 +16,7 @@ -export([get_db_info/1, get_doc_count/1, get_update_seq/1]). -export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3, - update_docs/3]). + update_docs/3, update_docs_seq/3]). -export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). -export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3, set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]). @@ -34,7 +34,9 @@ offset = nil, total_rows, reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, - group_level = 0 + group_level = 0, + % send_seq is used to determine if db sequence is part of the message + send_seq = false }). %% rpc endpoints @@ -70,7 +72,7 @@ all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} ], {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), - final_response(Total, Acc#view_acc.offset). + final_response(Total, Acc). changes(DbName, #changes_args{} = Args, StartSeq) -> changes(DbName, [Args], StartSeq); @@ -104,11 +106,12 @@ map_view(DbName, DDoc, ViewName, QueryArgs) -> view_type = ViewType, extra = Extra } = QueryArgs, + DbSeq = proplists:get_value(curr_seq, Extra, 0), set_io_priority(DbName, Extra), {LastSeq, MinSeq} = calculate_seqs(Db, Stale), Group0 = couch_view_group:design_doc_to_view_group(DDoc), {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), + {ok, Group} = couch_view_group:request_group(Pid, compute_min(MinSeq,DbSeq)), maybe_update_view_group(Pid, LastSeq, Stale), erlang:monitor(process, couch_view_group:get_fd(Group)), Views = couch_view_group:get_views(Group), @@ -120,7 +123,8 @@ map_view(DbName, DDoc, ViewName, QueryArgs) -> conflicts = proplists:get_value(conflicts, Extra, false), limit = Limit+Skip, total_rows = Total, - reduce_fun = fun couch_view:reduce_to_count/1 + reduce_fun = fun couch_view:reduce_to_count/1, + send_seq = true }, case Keys of nil -> @@ -135,7 +139,8 @@ map_view(DbName, DDoc, ViewName, QueryArgs) -> Out end, Acc0, Keys) end, - final_response(Total, Acc#view_acc.offset). + final_response(Total, Acc). + reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) -> Group = couch_view_group:design_doc_to_view_group(DDoc), @@ -151,18 +156,20 @@ reduce_view(DbName, Group0, ViewName, QueryArgs) -> stale = Stale, extra = Extra } = QueryArgs, + DbSeq = proplists:get_value(curr_seq, Extra, 0), set_io_priority(DbName, Extra), GroupFun = group_rows_fun(GroupLevel), {LastSeq, MinSeq} = calculate_seqs(Db, Stale), {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), + {ok, Group} = couch_view_group:request_group(Pid, compute_min(MinSeq, DbSeq)), maybe_update_view_group(Pid, LastSeq, Stale), Lang = couch_view_group:get_language(Group), Views = couch_view_group:get_views(Group), erlang:monitor(process, couch_view_group:get_fd(Group)), {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), ReduceView = {reduce, NthRed, Lang, View}, - Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, + Acc0 = #view_acc{db=Db, group_level = GroupLevel, + limit = Limit+Skip, send_seq = true}, case Keys of nil -> Options0 = couch_httpd_view:make_key_options(QueryArgs), @@ -187,6 +194,13 @@ calculate_seqs(Db, Stale) -> {LastSeq, LastSeq} end. +% If Db decoded sequence wasn't used then go with the MinSeq computed +% from the Stale args and update sequence +compute_min(MinSeq,0) -> + MinSeq; +compute_min(MinSeq, DbSeq) -> + erlang:min(MinSeq, DbSeq). + maybe_update_view_group(GroupPid, LastSeq, update_after) -> couch_view_group:trigger_group_update(GroupPid, LastSeq); maybe_update_view_group(_, _, _) -> @@ -255,6 +269,36 @@ get_missing_revs(DbName, IdRevsList, Options) -> Error end). +update_docs_seq(DbName, Docs0, Options) -> + case proplists:get_value(replicated_changes, Options) of + true -> + X = replicated_changes; + _ -> + X = interactive_edit + end, + Docs = make_att_readers(Docs0), + %% inlining with_db here to return a tuple instead and not + %% have to contend with all the other callers of with_db + set_io_priority(DbName, Options), + case get_or_create_db(DbName, Options) of + {ok, Db} -> + rexi:reply(try + Reply = couch_db:update_docs(Db, Docs, Options, X), + %% open db again to get updated sequence + {ok, DbNew} = get_or_create_db(DbName, Options), + SeqAfter = couch_db:get_update_seq(DbNew), + {Reply, SeqAfter} + catch Exception -> + Exception; + error:Reason -> + twig:log(error, "rpc ~p:~p/~p ~p ~p", [couch_db, update_docs, 4, Reason, + clean_stack()]), + {error, Reason} + end); + Error -> + rexi:reply(Error) + end. + update_docs(DbName, Docs0, Options) -> case proplists:get_value(replicated_changes, Options) of true -> @@ -265,6 +309,7 @@ update_docs(DbName, Docs0, Options) -> Docs = make_att_readers(Docs0), with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). + group_info(DbName, Group0) -> {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), rexi:reply(couch_view_group:request_group_info(Pid)). @@ -316,11 +361,19 @@ view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> #doc_info{revs=[#rev_info{deleted=true}|_]} -> {ok, Acc} end; -view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> +view_fold(KV, OffsetReds, #view_acc{db=Db, offset=nil, + total_rows=Total, send_seq=SendSeq} = Acc) -> % calculates the offset for this shard #view_acc{reduce_fun=Reduce} = Acc, Offset = Reduce(OffsetReds), - case rexi:sync_reply({total_and_offset, Total, Offset}) of + case SendSeq of + true -> + DbSeq = couch_db:get_update_seq(Db), + Msg = {total_and_offset, Total, Offset, DbSeq}; + false -> + Msg = {total_and_offset, Total, Offset} + end, + case rexi:sync_reply(Msg) of ok -> view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); stop -> @@ -338,7 +391,8 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) -> doc_info = DocInfo, limit = Limit, conflicts = Conflicts, - include_docs = IncludeDocs + include_docs = IncludeDocs, + send_seq = SendSeq } = Acc, case Value of {Props} -> LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined); @@ -362,22 +416,35 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) -> true -> Doc = undefined end, - case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of + case SendSeq of + true -> + DbSeq = couch_db:get_update_seq(Db), + Msg = {#view_row{key=Key, id=Id, value=Value, doc=Doc}, DbSeq}; + false -> + Msg = #view_row{key=Key, id=Id, value=Value, doc=Doc} + end, + case rexi:stream(Msg) of ok -> {ok, Acc#view_acc{limit=Limit-1}}; timeout -> exit(timeout) end. -final_response(Total, nil) -> - case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> +final_response(Total, #view_acc{db=Db, offset=nil, send_seq=SendSeq}) -> + case SendSeq of + true -> + Msg = {total_and_offset, Total, Total, couch_db:get_update_seq(Db)}; + false -> + Msg = {total_and_offset, Total, Total} + end, + case rexi:sync_reply(Msg) of ok -> rexi:reply(complete); stop -> ok; timeout -> exit(timeout) end; -final_response(_Total, _Offset) -> +final_response(_,_) -> rexi:reply(complete). %% TODO: handle case of bogus group level @@ -404,10 +471,11 @@ reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 -> send(K, Red, Acc). -send(Key, Value, #view_acc{limit=Limit} = Acc) -> +send(Key, Value, #view_acc{db=Db, limit=Limit} = Acc) -> case put(fabric_sent_first_row, true) of undefined -> - case rexi:sync_reply(#view_row{key=Key, value=Value}) of + Msg = {#view_row{key=Key, value=Value}, couch_db:get_update_seq(Db)}, + case rexi:sync_reply(Msg) of ok -> {ok, Acc#view_acc{limit=Limit-1}}; stop -> diff --git a/src/fabric_util.erl b/src/fabric_util.erl index 0ff1735..c7db7ea 100644 --- a/src/fabric_util.erl +++ b/src/fabric_util.erl @@ -16,7 +16,9 @@ -export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1, update_counter/3, remove_ancestors/2, create_monitors/1, kv/2, - remove_down_workers/2]). + update_seqs/3, + remove_down_workers/2, remove_non_preferred_shards/2, + pack/1, process_enc_shards/2]). -export([request_timeout/0]). -include("fabric.hrl"). @@ -143,6 +145,84 @@ create_monitors(Shards) -> MonRefs = lists:usort([{rexi_server, N} || #shard{node=N} <- Shards]), rexi_monitor:start(MonRefs). +remove_non_preferred_shards(PreferredShards, Shards) -> + %% the preferred shards are the good ones. Remove shards that + %% are in the same range but different nodes + lists:foldl(fun(Shard, Acc) -> + case check_shard(Shard, PreferredShards) of + -1 -> + Acc; + DbSeq -> + [{Shard, DbSeq} | Acc] + end + end, [], Shards). + +check_shard(_, []) -> + 0; +check_shard(#shard{name=Name, node=Node}=Shard, [{#shard{name=Name2, node=Node2}, DbSeq} | Rest]) -> + case Name =:= Name2 of + true -> + case Node =:= Node2 of + true -> + DbSeq; + false -> + -1 + end; + _ -> + check_shard(Shard, Rest) + end. + + +process_enc_shards(EncShards, DbName) -> + case EncShards of + [] -> []; + _ -> + unpack_seqs(EncShards, DbName) + end. + +pack(Responders) -> + ShardList = [{N,R,DbSeq} || {#shard{node=N, range=R}, DbSeq} <- Responders], + couch_util:encodeBase64Url(term_to_binary(ShardList, [compressed])). + +unpack_seqs(Packed, DbName) -> + NodeRangeSeqs = unpack_and_merge(Packed), + lists:map(fun({Node, Range, DbSeq}) -> + {ok, Shard} = mem3:get_shard(DbName, Node, Range), + {Shard, DbSeq} + end, + NodeRangeSeqs). + +unpack_and_merge(PackedNRS) -> + NodeRangeSeqs = + lists:foldl(fun(NRQ, Acc) -> + binary_to_term(couch_util:decodeBase64Url(NRQ)) ++ Acc + end,[],PackedNRS), + MergedList = + lists:foldl(fun({Node,Range,Seq}, Acc) -> + [merge({Node,Range,Seq}, NodeRangeSeqs) | Acc] + end,[],NodeRangeSeqs), + lists:usort(MergedList). + +merge(NRS, []) -> + NRS; + +merge({Node, Range, Seq}, [{Node2, Range2, Seq2} | Rest]) -> + case Node =:= Node2 andalso Range =:= Range2 of + true -> + {Node, Range, erlang:max(Seq,Seq2)}; + false -> + merge({Node, Range, Seq}, Rest) + end. + +update_seqs(DbSeq, Worker, #collector{seqs=Seqs}=State) -> + case lists:member({Worker, DbSeq}, Seqs) of + true -> + NewSeqs = Seqs; + false -> + NewSeqs = [{Worker, DbSeq} | Seqs] + end, + State#collector{seqs=NewSeqs}. + %% verify only id and rev are used in key. update_counter_test() -> Reply = {ok, #doc{id = <<"id">>, revs = <<"rev">>, @@ -171,3 +251,185 @@ remove_ancestors_test() -> %% test function kv(Item, Count) -> {make_key(Item), {Item,Count}}. + +remove_non_preferred_shards_test() -> + Shards = + [{shard,<<"shards/00000000-7fffffff/testdb1">>, + 'node1',<<"testdb1">>, + [0,536870911], + undefined}, + {shard,<<"shards/80000000-ffffffff/testdb1">>, + 'node1',<<"testdb1">>, + [536870912,1073741823], + undefined}, + {shard,<<"shards/00000000-7fffffff/testdb1">>, + 'node2',<<"testdb1">>, + [0,536870911], + undefined}, + {shard,<<"shards/80000000-ffffffff/testdb1">>, + 'node2',<<"testdb1">>, + [536870912,1073741823], + undefined}, + {shard,<<"shards/00000000-7fffffff/testdb1">>, + 'node3',<<"testdb1">>, + [0,536870911], + undefined}, + {shard,<<"shards/80000000-ffffffff/testdb1">>, + 'node3',<<"testdb1">>, + [536870912,1073741823], + undefined}], + PreferredShards = + [{{shard,<<"shards/00000000-7fffffff/testdb1">>, + 'node1',<<"testdb1">>, + [0,536870911], + undefined},2}, + {{shard,<<"shards/80000000-ffffffff/testdb1">>, + 'node2',<<"testdb1">>, + [536870912,1073741823], + undefined},3}], + NewShards = remove_non_preferred_shards(PreferredShards, Shards), + ?assertEqual(lists:reverse(NewShards),PreferredShards), + ok. + +remove_non_preferred_shards2_test() -> + Shards = + [{shard,<<"shards/00000000-3fffffff/testdb1">>, + 'node1',<<"testdb1">>, + [0,536870911], + undefined}, + {shard,<<"shards/40000000-7fffffff/testdb1">>, + 'node1',<<"testdb1">>, + [1073741824,1610612735], + undefined}, + {shard,<<"shards/80000000-bfffffff/testdb1">>, + 'node1',<<"testdb1">>, + [2147483648,2684354559], + undefined}, + {shard,<<"shards/c0000000-ffffffff/testdb1">>, + 'node1',<<"testdb1">>, + [3221225472,3758096383], + undefined}, + {shard,<<"shards/00000000-3fffffff/testdb1">>, + 'node2',<<"testdb1">>, + [0,536870911], + undefined}, + {shard,<<"shards/40000000-7fffffff/testdb1">>, + 'node2',<<"testdb1">>, + [1073741824,1610612735], + undefined}, + {shard,<<"shards/80000000-bfffffff/testdb1">>, + 'node2',<<"testdb1">>, + [2147483648,2684354559], + undefined}, + {shard,<<"shards/c0000000-ffffffff/testdb1">>, + 'node2',<<"testdb1">>, + [3221225472,3758096383], + undefined}, + {shard,<<"shards/00000000-3fffffff/testdb1">>, + 'node3',<<"testdb1">>, + [0,536870911], + undefined}, + {shard,<<"shards/40000000-7fffffff/testdb1">>, + 'node3',<<"testdb1">>, + [1073741824,1610612735], + undefined}, + {shard,<<"shards/80000000-bfffffff/testdb1">>, + 'node3',<<"testdb1">>, + [2147483648,2684354559], + undefined}, + {shard,<<"shards/c0000000-ffffffff/testdb1">>, + 'node3',<<"testdb1">>, + [3221225472,3758096383], + undefined}], + PreferredShards = + [{{shard,<<"shards/00000000-3fffffff/testdb1">>, + 'node1',<<"testdb1">>, + [0,536870911], + undefined},2}, + {{shard,<<"shards/80000000-bfffffff/testdb1">>, + 'node2',<<"testdb1">>, + [536870912,1073741823], + undefined},3}], + ExpectedShards = + [{{shard,<<"shards/00000000-3fffffff/testdb1">>, + 'node1',<<"testdb1">>, + [0,536870911], + undefined},2}, + {{shard,<<"shards/40000000-7fffffff/testdb1">>, + 'node1',<<"testdb1">>, + [1073741824,1610612735], + undefined},0}, + {{shard,<<"shards/c0000000-ffffffff/testdb1">>, + 'node1',<<"testdb1">>, + [3221225472,3758096383], + undefined},0}, + {{shard,<<"shards/40000000-7fffffff/testdb1">>, + 'node2',<<"testdb1">>, + [1073741824,1610612735], + undefined},0}, + {{shard,<<"shards/80000000-bfffffff/testdb1">>, + 'node2',<<"testdb1">>, + [2147483648,2684354559], + undefined},3}, + {{shard,<<"shards/c0000000-ffffffff/testdb1">>, + 'node2',<<"testdb1">>, + [3221225472,3758096383], + undefined},0}, + {{shard,<<"shards/40000000-7fffffff/testdb1">>, + 'node3',<<"testdb1">>, + [1073741824,1610612735], + undefined},0}, + {{shard,<<"shards/c0000000-ffffffff/testdb1">>, + 'node3',<<"testdb1">>, + [3221225472,3758096383], + undefined},0}], + NewShards = remove_non_preferred_shards(PreferredShards, Shards), + ?assertEqual(lists:reverse(NewShards),ExpectedShards), + ok. + +unpack_and_merge_test() -> + Shards = + [{shard,<<"shards/00000000-1fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [0,536870911], + undefined}, + {shard,<<"shards/20000000-3fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [536870912,1073741823], + undefined}, + {shard,<<"shards/40000000-5fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [1073741824,1610612735], + undefined}, + {shard,<<"shards/60000000-7fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [1610612736,2147483647], + undefined}, + {shard,<<"shards/80000000-9fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [2147483648,2684354559], + undefined}, + {shard,<<"shards/a0000000-bfffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [2684354560,3221225471], + undefined}, + {shard,<<"shards/c0000000-dfffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [3221225472,3758096383], + undefined}, + {shard,<<"shards/e0000000-ffffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [3758096384,4294967295], + undefined}], + Responders1 = lists:map(fun(S) -> + {S, 1} + end, Shards), + Responders2 = lists:map(fun(S) -> + {S, 2} + end, Shards), + P1 = pack(Responders1), + P2 = pack(Responders2), + ShardList = [{N,R,DbSeq} || {#shard{node=N, range=R}, DbSeq} <- Responders2], + ?assertEqual(ShardList, unpack_and_merge([P1, P2])), + ok. + diff --git a/src/fabric_view.erl b/src/fabric_view.erl index 4107e61..0d0f642 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -108,7 +108,9 @@ maybe_send_row(State) -> counters = Counters, skip = Skip, limit = Limit, - user_acc = AccIn + user_acc = AccIn, + seqs = Seqs, + seqs_sent = SeqsSent } = State, case fabric_dict:any(0, Counters) of true -> @@ -118,11 +120,18 @@ maybe_send_row(State) -> {_, NewState} when Skip > 0 -> maybe_send_row(NewState#collector{skip=Skip-1}); {Row, NewState} -> - case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of + case SeqsSent of + true -> + Msg = transform_row(possibly_embed_doc(NewState,Row)); + false -> + Msg = {transform_row(possibly_embed_doc(NewState,Row)), + fabric_util:pack(Seqs)} + end, + case Callback(Msg, AccIn) of {stop, Acc} -> - {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; + {stop, NewState#collector{user_acc=Acc, limit=Limit-1, seqs_sent=true}}; {ok, Acc} -> - maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) + maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1, seqs_sent=true}) end catch complete -> {_, Acc} = Callback(complete, AccIn), @@ -343,3 +352,4 @@ mk_shards(NoNodes,Range,Shards) -> NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)), mk_shards(NoNodes-1,Range, [#shard{name=NodeName, node=NodeName, range=Range} | Shards]). + diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl index 8a4ed84..7599934 100644 --- a/src/fabric_view_all_docs.erl +++ b/src/fabric_view_all_docs.erl @@ -14,15 +14,18 @@ -module(fabric_view_all_docs). --export([go/4]). +-export([go/4, go/5]). -export([open_doc/3]). % exported for spawn -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> - Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[QueryArgs]), +go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0, EncShards) -> + PreferredShards = fabric_util:process_enc_shards(EncShards, DbName), + Shards0 = fabric_util:remove_non_preferred_shards(PreferredShards, mem3:shards(DbName)), + {Shards, _Seqs} = lists:unzip(Shards0), + Workers = fabric_util:submit_jobs(Shards,all_docs,[QueryArgs]), #view_query_args{limit = Limit, skip = Skip} = QueryArgs, State = #collector{ query_args = QueryArgs, @@ -30,7 +33,8 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> counters = fabric_dict:init(Workers, 0), skip = Skip, limit = Limit, - user_acc = Acc0 + user_acc = Acc0, + seqs_sent = true }, RexiMon = fabric_util:create_monitors(Workers), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, @@ -44,7 +48,7 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> after rexi_monitor:stop(RexiMon), fabric_util:cleanup(Workers) - end; + end. go(DbName, QueryArgs, Callback, Acc0) -> diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl index 4f6a3f1..2ea2f0f 100644 --- a/src/fabric_view_map.erl +++ b/src/fabric_view_map.erl @@ -14,15 +14,32 @@ -module(fabric_view_map). --export([go/6]). +-export([go/7]). -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> +go(DbName, GroupId, View, Args, Callback, Acc0, EncShards) when is_binary(GroupId) -> {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); + go(DbName, DDoc, View, Args, Callback, Acc0, EncShards); + +go(DbName, DDoc, View, #view_query_args{stale=Stale, extra=Extra}=Args, Callback, Acc0, EncShards) -> + AllShards = fabric_view:get_shards(DbName, Args), + PrefShardsSeqs = + if (Stale == ok orelse Stale == update_after) -> + % stale trumps as we use primary shards + []; + true -> + fabric_util:process_enc_shards(EncShards, DbName) + end, + Shards = fabric_util:remove_non_preferred_shards(PrefShardsSeqs, AllShards), + Workers = + lists:map(fun({#shard{node=Node, name=ShardName} = Shard, DbSeq}) -> + NewExtra = lists:keystore(curr_seq, 1, Extra, {curr_seq, DbSeq}), + Ref = rexi:cast(Node, {fabric_rpc, map_view, [ShardName, DDoc, View, Args#view_query_args{extra=NewExtra}]}), + Shard#shard{ref = Ref} + end, Shards), go(DbName, DDoc, View, Args, Callback, Acc0) -> Shards = fabric_view:get_shards(DbName, Args), @@ -67,56 +84,79 @@ handle_message({rexi_EXIT, Reason}, Worker, State) -> {error, Resp} end; -handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> +handle_message({total_and_offset, Tot, Off, DbSeq}, {Worker, From}, State0) -> #collector{ callback = Callback, counters = Counters0, total_rows = Total0, offset = Offset0, - user_acc = AccIn - } = State, + user_acc = AccIn, + seqs_sent = SeqsSent + } = State0, case fabric_dict:lookup_element(Worker, Counters0) of undefined -> % this worker lost the race with other partition copies, terminate gen_server:reply(From, stop), - {ok, State}; - 0 -> + {ok, State0}; + _ -> gen_server:reply(From, ok), Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), + State1 = fabric_util:update_seqs(DbSeq, Worker, State0), Total = Total0 + Tot, Offset = Offset0 + Off, case fabric_dict:any(0, Counters2) of true -> - {ok, State#collector{ + {ok, State1#collector{ counters = Counters2, total_rows = Total, offset = Offset }}; false -> - FinalOffset = erlang:min(Total, Offset+State#collector.skip), - {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), - {Go, State#collector{ + FinalOffset = erlang:min(Total, Offset+State1#collector.skip), + case SeqsSent of + true -> + Msg = {total_and_offset, + Total, FinalOffset}; + false -> + Msg = {total_and_offset, Total, + FinalOffset, fabric_util:pack(State1#collector.seqs)} + end, + {Go, Acc} = Callback(Msg, AccIn), + {Go, State1#collector{ counters = fabric_dict:decrement_all(Counters2), total_rows = Total, offset = FinalOffset, - user_acc = Acc + user_acc = Acc, + seqs_sent = true }} end end; -handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) -> +handle_message({#view_row{}, _DbSeq}, {_, _}, #collector{limit=0} = State) -> #collector{callback=Callback} = State, {_, Acc} = Callback(complete, State#collector.user_acc), {stop, State#collector{user_acc=Acc}}; -handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) -> - #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St, - {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), +handle_message(#view_row{}=Row, {Worker,From}, #collector{sorted=false}=St) -> + #collector{ + callback = Calback, + user_acc = AccIn, + limit = Limit, + seqs_sent = Sent + } = St, + case Sent of + true -> + {Go, Acc} = Callback(fabric_view:transform(Row), AccIn); + false -> + Seqs = St2#collector.seqs, + {Go, Acc} = Callback({fabric_view:transform_row(Row), + fabric_util:pack(Seqs)}, AccIn) + end, rexi:stream_ack(From), - {Go, St#collector{user_acc=Acc, limit=Limit-1}}; + {Go, St#collector{user_acc=Acc, limit=Limit-1, seqs_sent=true}}; -handle_message(#view_row{} = Row, {Worker, From}, State) -> +handle_message({#view_row{} = Row, DbSeq}, {Worker, From}, State) -> #collector{ query_args = #view_query_args{direction=Dir}, counters = Counters0, @@ -126,7 +166,8 @@ handle_message(#view_row{} = Row, {Worker, From}, State) -> Rows = merge_row(Dir, KeyDict, Row#view_row{worker={Worker, From}}, Rows0), Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), State1 = State#collector{rows=Rows, counters=Counters1}, - fabric_view:maybe_send_row(State1); + State2 = fabric_util:update_seqs(DbSeq, Worker, State1), + fabric_view:maybe_send_row(State2); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl index a55e8cf..edfa50a 100644 --- a/src/fabric_view_reduce.erl +++ b/src/fabric_view_reduce.erl @@ -14,26 +14,41 @@ -module(fabric_view_reduce). --export([go/6]). +-export([go/7]). -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> +go(DbName, GroupId, View, Args, Callback, Acc0, EncShards) when is_binary(GroupId) -> {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); + go(DbName, DDoc, View, Args, Callback, Acc0, EncShards); -go(DbName, DDoc, VName, Args, Callback, Acc0) -> +go(DbName, DDoc, VName, Args, Callback, Acc0, EncShards) -> + #view_query_args{ + stale = Stale, + extra = Extra + } = Args + AllShards = fabric_view:get_shards(DbName, Args), + PreferredShards = if (Stale == ok orelse Stale == update_after) -> + % stale trumps as we use primary shards + []; + true -> + fabric_util:process_enc_shards(EncShards, DbName) + end, + Shards = fabric_util:remove_non_preferred_shards( + PreferredShards, AllShards), Group = couch_view_group:design_doc_to_view_group(DDoc), Lang = couch_view_group:get_language(Group), Views = couch_view_group:get_views(Group), {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), - Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> - Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}), + Workers = lists:map(fun({#shard{name=Name, node=N} = Shard, DbSeq}) -> + NewExtra = lists:keystore(curr_seq, 1, Extra, {curr_seq, DbSeq}), + Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName, + Args#view_query_args{extra=NewExtra}]}), Shard#shard{ref = Ref} - end, fabric_view:get_shards(DbName, Args)), + end, Shards), RexiMon = fabric_util:create_monitors(Workers), #view_query_args{limit = Limit, skip = Skip} = Args, OsProc = case os_proc_needed(RedSrc) of @@ -85,7 +100,7 @@ handle_message({rexi_EXIT, Reason}, Worker, State) -> {error, Resp} end; -handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> +handle_message({#view_row{key=Key} = Row, DbSeq}, {Worker, From}, State) -> #collector{counters = Counters0, rows = Rows0} = State, case fabric_dict:lookup_element(Worker, Counters0) of undefined -> @@ -98,7 +113,8 @@ handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> % TODO time this call, if slow don't do it every time C2 = fabric_view:remove_overlapping_shards(Worker, C1), State1 = State#collector{rows=Rows, counters=C2}, - fabric_view:maybe_send_row(State1) + State2 = fabric_util:update_seqs(DbSeq, Worker, State1), + fabric_view:maybe_send_row(State2) end; handle_message(complete, Worker, #collector{counters = Counters0} = State) ->