diff --git a/include/fabric.hrl b/include/fabric.hrl
index c8de0f8..e56ff71 100644
--- a/include/fabric.hrl
+++ b/include/fabric.hrl
@@ -31,7 +31,9 @@
reducer,
lang,
sorted,
- user_acc
+ user_acc,
+ seqs = [],
+ seqs_sent = false
}).
-record(view_row, {key, id, value, doc, worker}).
diff --git a/src/fabric.erl b/src/fabric.erl
index fe6bf6c..219a729 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -32,6 +32,8 @@
% Views
-export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6,
get_view_group_info/2]).
+-export([all_docs/5]).
+-export([query_view/7]).
% miscellany
-export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
@@ -192,18 +194,18 @@ get_missing_revs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
{ok, any()} | any().
update_doc(DbName, Doc, Options) ->
case update_docs(DbName, [Doc], opts(Options)) of
- {ok, [{ok, NewRev}]} ->
- {ok, NewRev};
- {accepted, [{accepted, NewRev}]} ->
- {accepted, NewRev};
- {ok, [{{_Id, _Rev}, Error}]} ->
+ {ok, [{ok, NewRev}], EncShards} ->
+ {ok, NewRev, EncShards};
+ {accepted, [{accepted, NewRev}], EncShards} ->
+ {accepted, NewRev, EncShards};
+ {ok, [{{_Id, _Rev}, Error}], _} ->
throw(Error);
- {ok, [Error]} ->
+ {ok, [Error], _} ->
throw(Error);
- {ok, []} ->
+ {ok, [], EncShards} ->
% replication success
#doc{revs = {Pos, [RevId | _]}} = doc(Doc),
- {ok, {Pos, RevId}}
+ {ok, {Pos, RevId}, EncShards}
end.
%% @doc update a list of docs
@@ -212,10 +214,10 @@ update_doc(DbName, Doc, Options) ->
update_docs(DbName, Docs, Options) ->
try
fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) of
- {ok, Results} ->
- {ok, Results};
- {accepted, Results} ->
- {accepted, Results};
+ {ok, Results, EncShards} ->
+ {ok, Results, EncShards};
+ {accepted, Results, EncShards} ->
+ {accepted, Results, EncShards};
Error ->
throw(Error)
catch {aborted, PreCommitFailures} ->
@@ -234,21 +236,24 @@ purge_docs(_DbName, _IdsRevs) ->
att_receiver(Req, Length) ->
fabric_doc_attachments:receiver(Req, Length).
+all_docs(DbName, Callback, Acc0, QueryArgs) ->
+ all_docs(DbName, Callback, Acc0, QueryArgs, []).
+
%% @doc retrieves all docs. Additional query parameters, such as `limit',
%% `start_key' and `end_key', `descending', and `include_docs', can
%% also be passed to further constrain the query. See
%% all_docs for details
--spec all_docs(dbname(), callback(), [] | tuple(), #view_query_args{}) ->
+-spec all_docs(dbname(), callback(), [] | tuple(), #view_query_args{}, []) ->
{ok, [any()]}.
-all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when
+all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs, EncShards) when
is_function(Callback, 2) ->
- fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0);
+ fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0, EncShards);
%% @doc convenience function that takes a keylist rather than a record
%% @equiv all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs))
-all_docs(DbName, Callback, Acc0, QueryArgs) ->
- all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)).
+all_docs(DbName, Callback, Acc0, QueryArgs, EncShards) ->
+ all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs), EncShards).
-spec changes(dbname(), callback(), any(), #changes_args{} | [{atom(),any()}]) ->
@@ -270,16 +275,19 @@ query_view(DbName, DesignName, ViewName) ->
%% ViewName, fun default_callback/2, [], QueryArgs)
query_view(DbName, DesignName, ViewName, QueryArgs) ->
Callback = fun default_callback/2,
- query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs).
+ query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs, "").
+
+query_view(DbName, DesignName, ViewName, Callback, Acc0, QueryArgs) ->
+ query_view(DbName, DesignName, ViewName, Callback, Acc0, QueryArgs, []).
%% @doc execute a given view.
%% There are many additional query args that can be passed to a view,
%% see
%% query args for details.
-spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(),
- #view_query_args{}) ->
+ #view_query_args{}, any()) ->
any().
-query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) ->
+query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs, EncShards) ->
Db = dbname(DbName), View = name(ViewName),
case is_reduce_view(Db, Design, View, QueryArgs) of
true ->
@@ -287,7 +295,7 @@ query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) ->
false ->
Mod = fabric_view_map
end,
- Mod:go(Db, Design, View, QueryArgs, Callback, Acc0).
+ Mod:go(Db, Design, View, QueryArgs, Callback, Acc0, EncShards).
%% @doc retrieve info about a view group, disk size, language, whether compaction
%% is running and so forth
@@ -328,7 +336,7 @@ design_docs(DbName) ->
({error, Reason}, _Acc) ->
{error, Reason}
end,
- fabric:all_docs(dbname(DbName), Callback, [], QueryArgs).
+ fabric:all_docs(dbname(DbName), Callback, [], QueryArgs, "").
%% @doc forces a reload of validation functions, this is performed after
%% design docs are update
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index dd6f2ca..2cd8291 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -26,23 +26,23 @@ go(DbName, AllDocs, Opts) ->
validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
Options = lists:delete(all_or_nothing, Opts),
GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
- Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ Ref = rexi:cast(Node, {fabric_rpc, update_docs_seq, [Name, Docs, Options]}),
{Shard#shard{ref=Ref}, Docs}
end, group_docs_by_shard(DbName, AllDocs)),
{Workers, _} = lists:unzip(GroupedDocs),
RexiMon = fabric_util:create_monitors(Workers),
W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
- dict:from_list([{Doc,[]} || Doc <- AllDocs])},
+ dict:from_list([{Doc,[]} || Doc <- AllDocs]), []},
Timeout = fabric_util:request_timeout(),
- try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
- {ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
- {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]};
+ try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
+ {ok, {Health, Results, Responders}} when Health =:= ok; Health =:= accepted ->
+ {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply], Responders};
{timeout, Acc} ->
- {_, _, W1, _, DocReplDict} = Acc,
+ {_, _, W1, _, DocReplDict, Responders} = Acc,
{Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []},
DocReplDict),
- {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]};
+ {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply], Responders};
Else ->
Else
after
@@ -50,44 +50,49 @@ go(DbName, AllDocs, Opts) ->
end.
handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
- {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0,
+ {_, LenDocs, W, GroupedDocs, DocReplyDict, Responders} = Acc0,
NewGrpDocs = [X || {#shard{node=N}, _} = X <- GroupedDocs, N =/= NodeRef],
- skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict});
+ skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict, Responders});
handle_message({rexi_EXIT, _}, Worker, Acc0) ->
- {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0,
+ {WC,LenDocs,W,GrpDocs,DocReplyDict, Responders} = Acc0,
NewGrpDocs = lists:keydelete(Worker,1,GrpDocs),
- skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict});
+ skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict, Responders});
handle_message(internal_server_error, Worker, Acc0) ->
% happens when we fail to load validation functions in an RPC worker
- {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0,
+ {WC,LenDocs,W,GrpDocs,DocReplyDict, Responders} = Acc0,
NewGrpDocs = lists:keydelete(Worker,1,GrpDocs),
- skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict});
+ skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict, Responders});
handle_message(attachment_chunk_received, _Worker, Acc0) ->
{ok, Acc0};
-handle_message({ok, Replies}, Worker, Acc0) ->
- {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
+handle_message({{ok, Replies},DbSeq}, Worker, Acc0) ->
+ {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0, Responders} = Acc0,
+ NewResponders =
+ case lists:member({Worker,DbSeq}, Responders) of
+ true ->
+ Responders;
+ false ->
+ [{Worker,DbSeq} | Responders]
+ end,
{value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
- case {WaitingCount, dict:size(DocReplyDict)} of
- {1, _} ->
+ if WaitingCount =:= 1 ->
% last message has arrived, we need to conclude things
{Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []},
- DocReplyDict),
- {stop, {Health, Reply}};
- {_, DocCount} ->
- % we've got at least one reply for each document, let's take a look
+ DocReplyDict),
+ {stop, {Health, Reply, fabric_util:pack(NewResponders)}};
+ true ->
case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of
continue ->
- {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}};
+ {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict, NewResponders}};
{stop, W, FinalReplies} ->
- {stop, {ok, FinalReplies}}
+ {stop, {ok, FinalReplies, fabric_util:pack(NewResponders)}}
end
end;
handle_message({missing_stub, Stub}, _, _) ->
throw({missing_stub, Stub});
handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
- {_, _, _, GroupedDocs, _} = Acc0,
+ {_, _, _, GroupedDocs, _, _} = Acc0,
Docs = couch_util:get_value(Worker, GroupedDocs),
handle_message({ok, [X || _D <- Docs]}, Worker, Acc0).
@@ -160,9 +165,9 @@ append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
% TODO what if the same document shows up twice in one update_docs call?
append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
-skip_message({0, _, W, _, DocReplyDict}) ->
+skip_message({0, _, W, _, DocReplyDict, Responders}) ->
{Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),
- {stop, {Health, Reply}};
+ {stop, {Health, Reply, Responders}};
skip_message(Acc0) ->
{ok, Acc0}.
@@ -195,30 +200,29 @@ doc_update1_test() ->
% test for W = 2
AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
- Dict},
+ Dict,[]},
- {ok,{WaitingCountW2_1,_,_,_,_}=AccW2_1} =
- handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW2),
+ {ok,{WaitingCountW2_1,_,_,_,_,_}=AccW2_1} =
+ handle_message({{ok, [{ok, Doc1}]}, 0},hd(Shards),AccW2),
?assertEqual(WaitingCountW2_1,2),
- {stop, FinalReplyW2 } =
- handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW2_1),
- ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW2),
+
+ {stop, {ok, [{Doc1, {ok,Doc1}}], _Responders}} =
+ handle_message({{ok, [{ok, Doc1}]}, 0},lists:nth(2,Shards),AccW2_1),
% test for W = 3
AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs,
- Dict},
+ Dict,[]},
- {ok,{WaitingCountW3_1,_,_,_,_}=AccW3_1} =
- handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW3),
+ {ok,{WaitingCountW3_1,_,_,_,_,_}=AccW3_1} =
+ handle_message({{ok, [{ok, Doc1}]}, 0},hd(Shards),AccW3),
?assertEqual(WaitingCountW3_1,2),
- {ok,{WaitingCountW3_2,_,_,_,_}=AccW3_2} =
- handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW3_1),
+ {ok,{WaitingCountW3_2,_,_,_,_,_}=AccW3_2} =
+ handle_message({{ok, [{ok, Doc1}]}, 0},lists:nth(2,Shards),AccW3_1),
?assertEqual(WaitingCountW3_2,1),
- {stop, FinalReplyW3 } =
- handle_message({ok, [{ok, Doc1}]},lists:nth(3,Shards),AccW3_2),
- ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW3),
+ {stop, {ok, [{Doc1, {ok,Doc1}}], _Responders1}} =
+ handle_message({{ok, [{ok, Doc1}]}, 0},lists:nth(3,Shards),AccW3_2),
% test w quorum > # shards, which should fail immediately
@@ -226,9 +230,9 @@ doc_update1_test() ->
GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>,Shards2,Docs),
AccW4 =
- {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict},
+ {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict, []},
Bool =
- case handle_message({ok, [{ok, Doc1}]},hd(Shards2),AccW4) of
+ case handle_message({{ok, [{ok, Doc1}]}, 0},hd(Shards2),AccW4) of
{stop, _Reply} ->
true;
_ -> false
@@ -241,16 +245,12 @@ doc_update1_test() ->
SA2 = #shard{node=a, range=2},
SB2 = #shard{node=b, range=2},
GroupedDocs3 = [{SA1,[Doc1]}, {SB1,[Doc1]}, {SA2,[Doc2]}, {SB2,[Doc2]}],
- StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2},
- {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0),
+ StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2, []},
+ {ok, StW5_1} = handle_message({{ok, [{ok, "A"}]}, 0}, SA1, StW5_0),
{ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1),
{ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2),
- {stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3),
- ?assertEqual(
- {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}]},
- ReplyW5
- ).
-
+ {stop, {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}], _Responders2}} =
+ handle_message({rexi_EXIT, nil}, SB2, StW5_3).
doc_update2_test() ->
Doc1 = #doc{revs = {1,[<<"foo">>]}},
@@ -258,23 +258,20 @@ doc_update2_test() ->
Docs = [Doc2, Doc1],
Shards =
mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
- GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
+ GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs),
Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
- dict:from_list([{Doc,[]} || Doc <- Docs])},
+ dict:from_list([{Doc,[]} || Doc <- Docs]), []},
- {ok,{WaitingCount1,_,_,_,_}=Acc1} =
- handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0),
+ {ok,{WaitingCount1,_,_,_,_,_}=Acc1} =
+ handle_message({{ok, [{ok, Doc1},{ok, Doc2}]}, 0},hd(Shards),Acc0),
?assertEqual(WaitingCount1,2),
- {ok,{WaitingCount2,_,_,_,_}=Acc2} =
+ {ok,{WaitingCount2,_,_,_,_,_}=Acc2} =
handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1),
?assertEqual(WaitingCount2,1),
- {stop, Reply} =
- handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2),
-
- ?assertEqual({accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}]},
- Reply).
+ {stop, {accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}], _Responders}} =
+ handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2).
doc_update3_test() ->
Doc1 = #doc{revs = {1,[<<"foo">>]}},
@@ -284,20 +281,18 @@ doc_update3_test() ->
mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
- dict:from_list([{Doc,[]} || Doc <- Docs])},
+ dict:from_list([{Doc,[]} || Doc <- Docs]),[]},
- {ok,{WaitingCount1,_,_,_,_}=Acc1} =
- handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0),
+ {ok,{WaitingCount1,_,_,_,_,_}=Acc1} =
+ handle_message({{ok, [{ok, Doc1},{ok, Doc2}]}, 0},hd(Shards),Acc0),
?assertEqual(WaitingCount1,2),
- {ok,{WaitingCount2,_,_,_,_}=Acc2} =
+ {ok,{WaitingCount2,_,_,_,_,_}=Acc2} =
handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1),
?assertEqual(WaitingCount2,1),
- {stop, Reply} =
- handle_message({ok, [{ok, Doc1},{ok, Doc2}]},lists:nth(3,Shards),Acc2),
-
- ?assertEqual({ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}]},Reply).
+ {stop, {ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}], _Responders}} =
+ handle_message({{ok, [{ok, Doc1},{ok, Doc2}]}, 0},lists:nth(3,Shards),Acc2).
% needed for testing to avoid having to start the mem3 application
group_docs_by_shard_hack(_DbName, Shards, Docs) ->
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 2a79583..440b144 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -16,7 +16,7 @@
-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3,
- update_docs/3]).
+ update_docs/3, update_docs_seq/3]).
-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
-export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3,
set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
@@ -34,7 +34,9 @@
offset = nil,
total_rows,
reduce_fun = fun couch_db:enum_docs_reduce_to_count/1,
- group_level = 0
+ group_level = 0,
+ % send_seq is used to determine if db sequence is part of the message
+ send_seq = false
}).
%% rpc endpoints
@@ -70,7 +72,7 @@ all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) ->
{EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
],
{ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
- final_response(Total, Acc#view_acc.offset).
+ final_response(Total, Acc).
changes(DbName, #changes_args{} = Args, StartSeq) ->
changes(DbName, [Args], StartSeq);
@@ -104,11 +106,12 @@ map_view(DbName, DDoc, ViewName, QueryArgs) ->
view_type = ViewType,
extra = Extra
} = QueryArgs,
+ DbSeq = proplists:get_value(curr_seq, Extra, 0),
set_io_priority(DbName, Extra),
{LastSeq, MinSeq} = calculate_seqs(Db, Stale),
Group0 = couch_view_group:design_doc_to_view_group(DDoc),
{ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
+ {ok, Group} = couch_view_group:request_group(Pid, compute_min(MinSeq,DbSeq)),
maybe_update_view_group(Pid, LastSeq, Stale),
erlang:monitor(process, couch_view_group:get_fd(Group)),
Views = couch_view_group:get_views(Group),
@@ -120,7 +123,8 @@ map_view(DbName, DDoc, ViewName, QueryArgs) ->
conflicts = proplists:get_value(conflicts, Extra, false),
limit = Limit+Skip,
total_rows = Total,
- reduce_fun = fun couch_view:reduce_to_count/1
+ reduce_fun = fun couch_view:reduce_to_count/1,
+ send_seq = true
},
case Keys of
nil ->
@@ -135,7 +139,8 @@ map_view(DbName, DDoc, ViewName, QueryArgs) ->
Out
end, Acc0, Keys)
end,
- final_response(Total, Acc#view_acc.offset).
+ final_response(Total, Acc).
+
reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) ->
Group = couch_view_group:design_doc_to_view_group(DDoc),
@@ -151,18 +156,20 @@ reduce_view(DbName, Group0, ViewName, QueryArgs) ->
stale = Stale,
extra = Extra
} = QueryArgs,
+ DbSeq = proplists:get_value(curr_seq, Extra, 0),
set_io_priority(DbName, Extra),
GroupFun = group_rows_fun(GroupLevel),
{LastSeq, MinSeq} = calculate_seqs(Db, Stale),
{ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
+ {ok, Group} = couch_view_group:request_group(Pid, compute_min(MinSeq, DbSeq)),
maybe_update_view_group(Pid, LastSeq, Stale),
Lang = couch_view_group:get_language(Group),
Views = couch_view_group:get_views(Group),
erlang:monitor(process, couch_view_group:get_fd(Group)),
{NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
ReduceView = {reduce, NthRed, Lang, View},
- Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
+ Acc0 = #view_acc{db=Db, group_level = GroupLevel,
+ limit = Limit+Skip, send_seq = true},
case Keys of
nil ->
Options0 = couch_httpd_view:make_key_options(QueryArgs),
@@ -187,6 +194,13 @@ calculate_seqs(Db, Stale) ->
{LastSeq, LastSeq}
end.
+% If Db decoded sequence wasn't used then go with the MinSeq computed
+% from the Stale args and update sequence
+compute_min(MinSeq,0) ->
+ MinSeq;
+compute_min(MinSeq, DbSeq) ->
+ erlang:min(MinSeq, DbSeq).
+
maybe_update_view_group(GroupPid, LastSeq, update_after) ->
couch_view_group:trigger_group_update(GroupPid, LastSeq);
maybe_update_view_group(_, _, _) ->
@@ -255,6 +269,36 @@ get_missing_revs(DbName, IdRevsList, Options) ->
Error
end).
+update_docs_seq(DbName, Docs0, Options) ->
+ case proplists:get_value(replicated_changes, Options) of
+ true ->
+ X = replicated_changes;
+ _ ->
+ X = interactive_edit
+ end,
+ Docs = make_att_readers(Docs0),
+ %% inlining with_db here to return a tuple instead and not
+ %% have to contend with all the other callers of with_db
+ set_io_priority(DbName, Options),
+ case get_or_create_db(DbName, Options) of
+ {ok, Db} ->
+ rexi:reply(try
+ Reply = couch_db:update_docs(Db, Docs, Options, X),
+ %% open db again to get updated sequence
+ {ok, DbNew} = get_or_create_db(DbName, Options),
+ SeqAfter = couch_db:get_update_seq(DbNew),
+ {Reply, SeqAfter}
+ catch Exception ->
+ Exception;
+ error:Reason ->
+ twig:log(error, "rpc ~p:~p/~p ~p ~p", [couch_db, update_docs, 4, Reason,
+ clean_stack()]),
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
update_docs(DbName, Docs0, Options) ->
case proplists:get_value(replicated_changes, Options) of
true ->
@@ -265,6 +309,7 @@ update_docs(DbName, Docs0, Options) ->
Docs = make_att_readers(Docs0),
with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+
group_info(DbName, Group0) ->
{ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
rexi:reply(couch_view_group:request_group_info(Pid)).
@@ -316,11 +361,19 @@ view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
#doc_info{revs=[#rev_info{deleted=true}|_]} ->
{ok, Acc}
end;
-view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
+view_fold(KV, OffsetReds, #view_acc{db=Db, offset=nil,
+ total_rows=Total, send_seq=SendSeq} = Acc) ->
% calculates the offset for this shard
#view_acc{reduce_fun=Reduce} = Acc,
Offset = Reduce(OffsetReds),
- case rexi:sync_reply({total_and_offset, Total, Offset}) of
+ case SendSeq of
+ true ->
+ DbSeq = couch_db:get_update_seq(Db),
+ Msg = {total_and_offset, Total, Offset, DbSeq};
+ false ->
+ Msg = {total_and_offset, Total, Offset}
+ end,
+ case rexi:sync_reply(Msg) of
ok ->
view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
stop ->
@@ -338,7 +391,8 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) ->
doc_info = DocInfo,
limit = Limit,
conflicts = Conflicts,
- include_docs = IncludeDocs
+ include_docs = IncludeDocs,
+ send_seq = SendSeq
} = Acc,
case Value of {Props} ->
LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined);
@@ -362,22 +416,35 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) ->
true ->
Doc = undefined
end,
- case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
+ case SendSeq of
+ true ->
+ DbSeq = couch_db:get_update_seq(Db),
+ Msg = {#view_row{key=Key, id=Id, value=Value, doc=Doc}, DbSeq};
+ false ->
+ Msg = #view_row{key=Key, id=Id, value=Value, doc=Doc}
+ end,
+ case rexi:stream(Msg) of
ok ->
{ok, Acc#view_acc{limit=Limit-1}};
timeout ->
exit(timeout)
end.
-final_response(Total, nil) ->
- case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
+final_response(Total, #view_acc{db=Db, offset=nil, send_seq=SendSeq}) ->
+ case SendSeq of
+ true ->
+ Msg = {total_and_offset, Total, Total, couch_db:get_update_seq(Db)};
+ false ->
+ Msg = {total_and_offset, Total, Total}
+ end,
+ case rexi:sync_reply(Msg) of ok ->
rexi:reply(complete);
stop ->
ok;
timeout ->
exit(timeout)
end;
-final_response(_Total, _Offset) ->
+final_response(_,_) ->
rexi:reply(complete).
%% TODO: handle case of bogus group level
@@ -404,10 +471,11 @@ reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
send(K, Red, Acc).
-send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+send(Key, Value, #view_acc{db=Db, limit=Limit} = Acc) ->
case put(fabric_sent_first_row, true) of
undefined ->
- case rexi:sync_reply(#view_row{key=Key, value=Value}) of
+ Msg = {#view_row{key=Key, value=Value}, couch_db:get_update_seq(Db)},
+ case rexi:sync_reply(Msg) of
ok ->
{ok, Acc#view_acc{limit=Limit-1}};
stop ->
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index 0ff1735..c7db7ea 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,7 +16,9 @@
-export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
- remove_down_workers/2]).
+ update_seqs/3,
+ remove_down_workers/2, remove_non_preferred_shards/2,
+ pack/1, process_enc_shards/2]).
-export([request_timeout/0]).
-include("fabric.hrl").
@@ -143,6 +145,84 @@ create_monitors(Shards) ->
MonRefs = lists:usort([{rexi_server, N} || #shard{node=N} <- Shards]),
rexi_monitor:start(MonRefs).
+remove_non_preferred_shards(PreferredShards, Shards) ->
+ %% the preferred shards are the good ones. Remove shards that
+ %% are in the same range but different nodes
+ lists:foldl(fun(Shard, Acc) ->
+ case check_shard(Shard, PreferredShards) of
+ -1 ->
+ Acc;
+ DbSeq ->
+ [{Shard, DbSeq} | Acc]
+ end
+ end, [], Shards).
+
+check_shard(_, []) ->
+ 0;
+check_shard(#shard{name=Name, node=Node}=Shard, [{#shard{name=Name2, node=Node2}, DbSeq} | Rest]) ->
+ case Name =:= Name2 of
+ true ->
+ case Node =:= Node2 of
+ true ->
+ DbSeq;
+ false ->
+ -1
+ end;
+ _ ->
+ check_shard(Shard, Rest)
+ end.
+
+
+process_enc_shards(EncShards, DbName) ->
+ case EncShards of
+ [] -> [];
+ _ ->
+ unpack_seqs(EncShards, DbName)
+ end.
+
+pack(Responders) ->
+ ShardList = [{N,R,DbSeq} || {#shard{node=N, range=R}, DbSeq} <- Responders],
+ couch_util:encodeBase64Url(term_to_binary(ShardList, [compressed])).
+
+unpack_seqs(Packed, DbName) ->
+ NodeRangeSeqs = unpack_and_merge(Packed),
+ lists:map(fun({Node, Range, DbSeq}) ->
+ {ok, Shard} = mem3:get_shard(DbName, Node, Range),
+ {Shard, DbSeq}
+ end,
+ NodeRangeSeqs).
+
+unpack_and_merge(PackedNRS) ->
+ NodeRangeSeqs =
+ lists:foldl(fun(NRQ, Acc) ->
+ binary_to_term(couch_util:decodeBase64Url(NRQ)) ++ Acc
+ end,[],PackedNRS),
+ MergedList =
+ lists:foldl(fun({Node,Range,Seq}, Acc) ->
+ [merge({Node,Range,Seq}, NodeRangeSeqs) | Acc]
+ end,[],NodeRangeSeqs),
+ lists:usort(MergedList).
+
+merge(NRS, []) ->
+ NRS;
+
+merge({Node, Range, Seq}, [{Node2, Range2, Seq2} | Rest]) ->
+ case Node =:= Node2 andalso Range =:= Range2 of
+ true ->
+ {Node, Range, erlang:max(Seq,Seq2)};
+ false ->
+ merge({Node, Range, Seq}, Rest)
+ end.
+
+update_seqs(DbSeq, Worker, #collector{seqs=Seqs}=State) ->
+ case lists:member({Worker, DbSeq}, Seqs) of
+ true ->
+ NewSeqs = Seqs;
+ false ->
+ NewSeqs = [{Worker, DbSeq} | Seqs]
+ end,
+ State#collector{seqs=NewSeqs}.
+
%% verify only id and rev are used in key.
update_counter_test() ->
Reply = {ok, #doc{id = <<"id">>, revs = <<"rev">>,
@@ -171,3 +251,185 @@ remove_ancestors_test() ->
%% test function
kv(Item, Count) ->
{make_key(Item), {Item,Count}}.
+
+remove_non_preferred_shards_test() ->
+ Shards =
+ [{shard,<<"shards/00000000-7fffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/80000000-ffffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined},
+ {shard,<<"shards/00000000-7fffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/80000000-ffffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined},
+ {shard,<<"shards/00000000-7fffffff/testdb1">>,
+ 'node3',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/80000000-ffffffff/testdb1">>,
+ 'node3',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined}],
+ PreferredShards =
+ [{{shard,<<"shards/00000000-7fffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [0,536870911],
+ undefined},2},
+ {{shard,<<"shards/80000000-ffffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined},3}],
+ NewShards = remove_non_preferred_shards(PreferredShards, Shards),
+ ?assertEqual(lists:reverse(NewShards),PreferredShards),
+ ok.
+
+remove_non_preferred_shards2_test() ->
+ Shards =
+ [{shard,<<"shards/00000000-3fffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/40000000-7fffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},
+ {shard,<<"shards/80000000-bfffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [2147483648,2684354559],
+ undefined},
+ {shard,<<"shards/c0000000-ffffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},
+ {shard,<<"shards/00000000-3fffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/40000000-7fffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},
+ {shard,<<"shards/80000000-bfffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [2147483648,2684354559],
+ undefined},
+ {shard,<<"shards/c0000000-ffffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},
+ {shard,<<"shards/00000000-3fffffff/testdb1">>,
+ 'node3',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/40000000-7fffffff/testdb1">>,
+ 'node3',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},
+ {shard,<<"shards/80000000-bfffffff/testdb1">>,
+ 'node3',<<"testdb1">>,
+ [2147483648,2684354559],
+ undefined},
+ {shard,<<"shards/c0000000-ffffffff/testdb1">>,
+ 'node3',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined}],
+ PreferredShards =
+ [{{shard,<<"shards/00000000-3fffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [0,536870911],
+ undefined},2},
+ {{shard,<<"shards/80000000-bfffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined},3}],
+ ExpectedShards =
+ [{{shard,<<"shards/00000000-3fffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [0,536870911],
+ undefined},2},
+ {{shard,<<"shards/40000000-7fffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},0},
+ {{shard,<<"shards/c0000000-ffffffff/testdb1">>,
+ 'node1',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},0},
+ {{shard,<<"shards/40000000-7fffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},0},
+ {{shard,<<"shards/80000000-bfffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [2147483648,2684354559],
+ undefined},3},
+ {{shard,<<"shards/c0000000-ffffffff/testdb1">>,
+ 'node2',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},0},
+ {{shard,<<"shards/40000000-7fffffff/testdb1">>,
+ 'node3',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},0},
+ {{shard,<<"shards/c0000000-ffffffff/testdb1">>,
+ 'node3',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},0}],
+ NewShards = remove_non_preferred_shards(PreferredShards, Shards),
+ ?assertEqual(lists:reverse(NewShards),ExpectedShards),
+ ok.
+
+unpack_and_merge_test() ->
+ Shards =
+ [{shard,<<"shards/00000000-1fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/20000000-3fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined},
+ {shard,<<"shards/40000000-5fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},
+ {shard,<<"shards/60000000-7fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [1610612736,2147483647],
+ undefined},
+ {shard,<<"shards/80000000-9fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [2147483648,2684354559],
+ undefined},
+ {shard,<<"shards/a0000000-bfffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [2684354560,3221225471],
+ undefined},
+ {shard,<<"shards/c0000000-dfffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},
+ {shard,<<"shards/e0000000-ffffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [3758096384,4294967295],
+ undefined}],
+ Responders1 = lists:map(fun(S) ->
+ {S, 1}
+ end, Shards),
+ Responders2 = lists:map(fun(S) ->
+ {S, 2}
+ end, Shards),
+ P1 = pack(Responders1),
+ P2 = pack(Responders2),
+ ShardList = [{N,R,DbSeq} || {#shard{node=N, range=R}, DbSeq} <- Responders2],
+ ?assertEqual(ShardList, unpack_and_merge([P1, P2])),
+ ok.
+
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 4107e61..0d0f642 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -108,7 +108,9 @@ maybe_send_row(State) ->
counters = Counters,
skip = Skip,
limit = Limit,
- user_acc = AccIn
+ user_acc = AccIn,
+ seqs = Seqs,
+ seqs_sent = SeqsSent
} = State,
case fabric_dict:any(0, Counters) of
true ->
@@ -118,11 +120,18 @@ maybe_send_row(State) ->
{_, NewState} when Skip > 0 ->
maybe_send_row(NewState#collector{skip=Skip-1});
{Row, NewState} ->
- case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of
+ case SeqsSent of
+ true ->
+ Msg = transform_row(possibly_embed_doc(NewState,Row));
+ false ->
+ Msg = {transform_row(possibly_embed_doc(NewState,Row)),
+ fabric_util:pack(Seqs)}
+ end,
+ case Callback(Msg, AccIn) of
{stop, Acc} ->
- {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
+ {stop, NewState#collector{user_acc=Acc, limit=Limit-1, seqs_sent=true}};
{ok, Acc} ->
- maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
+ maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1, seqs_sent=true})
end
catch complete ->
{_, Acc} = Callback(complete, AccIn),
@@ -343,3 +352,4 @@ mk_shards(NoNodes,Range,Shards) ->
NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)),
mk_shards(NoNodes-1,Range,
[#shard{name=NodeName, node=NodeName, range=Range} | Shards]).
+
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 8a4ed84..7599934 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -14,15 +14,18 @@
-module(fabric_view_all_docs).
--export([go/4]).
+-export([go/4, go/5]).
-export([open_doc/3]). % exported for spawn
-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
- Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[QueryArgs]),
+go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0, EncShards) ->
+ PreferredShards = fabric_util:process_enc_shards(EncShards, DbName),
+ Shards0 = fabric_util:remove_non_preferred_shards(PreferredShards, mem3:shards(DbName)),
+ {Shards, _Seqs} = lists:unzip(Shards0),
+ Workers = fabric_util:submit_jobs(Shards,all_docs,[QueryArgs]),
#view_query_args{limit = Limit, skip = Skip} = QueryArgs,
State = #collector{
query_args = QueryArgs,
@@ -30,7 +33,8 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
counters = fabric_dict:init(Workers, 0),
skip = Skip,
limit = Limit,
- user_acc = Acc0
+ user_acc = Acc0,
+ seqs_sent = true
},
RexiMon = fabric_util:create_monitors(Workers),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
@@ -44,7 +48,7 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
after
rexi_monitor:stop(RexiMon),
fabric_util:cleanup(Workers)
- end;
+ end.
go(DbName, QueryArgs, Callback, Acc0) ->
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index 4f6a3f1..2ea2f0f 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -14,15 +14,32 @@
-module(fabric_view_map).
--export([go/6]).
+-export([go/7]).
-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
+go(DbName, GroupId, View, Args, Callback, Acc0, EncShards) when is_binary(GroupId) ->
{ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
- go(DbName, DDoc, View, Args, Callback, Acc0);
+ go(DbName, DDoc, View, Args, Callback, Acc0, EncShards);
+
+go(DbName, DDoc, View, #view_query_args{stale=Stale, extra=Extra}=Args, Callback, Acc0, EncShards) ->
+ AllShards = fabric_view:get_shards(DbName, Args),
+ PrefShardsSeqs =
+ if (Stale == ok orelse Stale == update_after) ->
+ % stale trumps as we use primary shards
+ [];
+ true ->
+ fabric_util:process_enc_shards(EncShards, DbName)
+ end,
+ Shards = fabric_util:remove_non_preferred_shards(PrefShardsSeqs, AllShards),
+ Workers =
+ lists:map(fun({#shard{node=Node, name=ShardName} = Shard, DbSeq}) ->
+ NewExtra = lists:keystore(curr_seq, 1, Extra, {curr_seq, DbSeq}),
+ Ref = rexi:cast(Node, {fabric_rpc, map_view, [ShardName, DDoc, View, Args#view_query_args{extra=NewExtra}]}),
+ Shard#shard{ref = Ref}
+ end, Shards),
go(DbName, DDoc, View, Args, Callback, Acc0) ->
Shards = fabric_view:get_shards(DbName, Args),
@@ -67,56 +84,79 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
{error, Resp}
end;
-handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
+handle_message({total_and_offset, Tot, Off, DbSeq}, {Worker, From}, State0) ->
#collector{
callback = Callback,
counters = Counters0,
total_rows = Total0,
offset = Offset0,
- user_acc = AccIn
- } = State,
+ user_acc = AccIn,
+ seqs_sent = SeqsSent
+ } = State0,
case fabric_dict:lookup_element(Worker, Counters0) of
undefined ->
% this worker lost the race with other partition copies, terminate
gen_server:reply(From, stop),
- {ok, State};
- 0 ->
+ {ok, State0};
+ _ ->
gen_server:reply(From, ok),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
+ State1 = fabric_util:update_seqs(DbSeq, Worker, State0),
Total = Total0 + Tot,
Offset = Offset0 + Off,
case fabric_dict:any(0, Counters2) of
true ->
- {ok, State#collector{
+ {ok, State1#collector{
counters = Counters2,
total_rows = Total,
offset = Offset
}};
false ->
- FinalOffset = erlang:min(Total, Offset+State#collector.skip),
- {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
- {Go, State#collector{
+ FinalOffset = erlang:min(Total, Offset+State1#collector.skip),
+ case SeqsSent of
+ true ->
+ Msg = {total_and_offset,
+ Total, FinalOffset};
+ false ->
+ Msg = {total_and_offset, Total,
+ FinalOffset, fabric_util:pack(State1#collector.seqs)}
+ end,
+ {Go, Acc} = Callback(Msg, AccIn),
+ {Go, State1#collector{
counters = fabric_dict:decrement_all(Counters2),
total_rows = Total,
offset = FinalOffset,
- user_acc = Acc
+ user_acc = Acc,
+ seqs_sent = true
}}
end
end;
-handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->
+handle_message({#view_row{}, _DbSeq}, {_, _}, #collector{limit=0} = State) ->
#collector{callback=Callback} = State,
{_, Acc} = Callback(complete, State#collector.user_acc),
{stop, State#collector{user_acc=Acc}};
-handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
- #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St,
- {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
+handle_message(#view_row{}=Row, {Worker,From}, #collector{sorted=false}=St) ->
+ #collector{
+ callback = Calback,
+ user_acc = AccIn,
+ limit = Limit,
+ seqs_sent = Sent
+ } = St,
+ case Sent of
+ true ->
+ {Go, Acc} = Callback(fabric_view:transform(Row), AccIn);
+ false ->
+ Seqs = St2#collector.seqs,
+ {Go, Acc} = Callback({fabric_view:transform_row(Row),
+ fabric_util:pack(Seqs)}, AccIn)
+ end,
rexi:stream_ack(From),
- {Go, St#collector{user_acc=Acc, limit=Limit-1}};
+ {Go, St#collector{user_acc=Acc, limit=Limit-1, seqs_sent=true}};
-handle_message(#view_row{} = Row, {Worker, From}, State) ->
+handle_message({#view_row{} = Row, DbSeq}, {Worker, From}, State) ->
#collector{
query_args = #view_query_args{direction=Dir},
counters = Counters0,
@@ -126,7 +166,8 @@ handle_message(#view_row{} = Row, {Worker, From}, State) ->
Rows = merge_row(Dir, KeyDict, Row#view_row{worker={Worker, From}}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows=Rows, counters=Counters1},
- fabric_view:maybe_send_row(State1);
+ State2 = fabric_util:update_seqs(DbSeq, Worker, State1),
+ fabric_view:maybe_send_row(State2);
handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index a55e8cf..edfa50a 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -14,26 +14,41 @@
-module(fabric_view_reduce).
--export([go/6]).
+-export([go/7]).
-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
+go(DbName, GroupId, View, Args, Callback, Acc0, EncShards) when is_binary(GroupId) ->
{ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
- go(DbName, DDoc, View, Args, Callback, Acc0);
+ go(DbName, DDoc, View, Args, Callback, Acc0, EncShards);
-go(DbName, DDoc, VName, Args, Callback, Acc0) ->
+go(DbName, DDoc, VName, Args, Callback, Acc0, EncShards) ->
+ #view_query_args{
+ stale = Stale,
+ extra = Extra
+ } = Args
+ AllShards = fabric_view:get_shards(DbName, Args),
+ PreferredShards = if (Stale == ok orelse Stale == update_after) ->
+ % stale trumps as we use primary shards
+ [];
+ true ->
+ fabric_util:process_enc_shards(EncShards, DbName)
+ end,
+ Shards = fabric_util:remove_non_preferred_shards(
+ PreferredShards, AllShards),
Group = couch_view_group:design_doc_to_view_group(DDoc),
Lang = couch_view_group:get_language(Group),
Views = couch_view_group:get_views(Group),
{NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
{VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
- Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
- Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
+ Workers = lists:map(fun({#shard{name=Name, node=N} = Shard, DbSeq}) ->
+ NewExtra = lists:keystore(curr_seq, 1, Extra, {curr_seq, DbSeq}),
+ Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,
+ Args#view_query_args{extra=NewExtra}]}),
Shard#shard{ref = Ref}
- end, fabric_view:get_shards(DbName, Args)),
+ end, Shards),
RexiMon = fabric_util:create_monitors(Workers),
#view_query_args{limit = Limit, skip = Skip} = Args,
OsProc = case os_proc_needed(RedSrc) of
@@ -85,7 +100,7 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
{error, Resp}
end;
-handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
+handle_message({#view_row{key=Key} = Row, DbSeq}, {Worker, From}, State) ->
#collector{counters = Counters0, rows = Rows0} = State,
case fabric_dict:lookup_element(Worker, Counters0) of
undefined ->
@@ -98,7 +113,8 @@ handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
% TODO time this call, if slow don't do it every time
C2 = fabric_view:remove_overlapping_shards(Worker, C1),
State1 = State#collector{rows=Rows, counters=C2},
- fabric_view:maybe_send_row(State1)
+ State2 = fabric_util:update_seqs(DbSeq, Worker, State1),
+ fabric_view:maybe_send_row(State2)
end;
handle_message(complete, Worker, #collector{counters = Counters0} = State) ->