From 9236cc6f658888c29952ba47afff37a5b9373167 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Sun, 5 Jul 2015 18:08:00 +0100 Subject: [PATCH] Allow caller to specify changes epoch directly BugzID: 43896 --- src/fabric.erl | 5 ++++- src/fabric_view_changes.erl | 32 ++++++++++++++++++++++---------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/src/fabric.erl b/src/fabric.erl index 69cd7c3..343b88c 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -32,7 +32,7 @@ % Views -export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6, - get_view_group_info/2, end_changes/0]). + get_view_group_info/2, end_changes/0, set_changes_epoch/1]). % miscellany -export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0, @@ -338,6 +338,9 @@ get_view_group_info(DbName, DesignId) -> end_changes() -> fabric_view_changes:increment_changes_epoch(). +set_changes_epoch(Epoch) when is_integer(Epoch) -> + fabric_view_changes:set_changes_epoch(Epoch). + %% @doc retrieve all the design docs from a database -spec design_docs(dbname()) -> {ok, [json_obj()]}. design_docs(DbName) -> diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl index 85a5b71..89c38b4 100644 --- a/src/fabric_view_changes.erl +++ b/src/fabric_view_changes.erl @@ -15,7 +15,7 @@ -module(fabric_view_changes). -export([go/5, pack_seqs/1, unpack_seqs/2]). --export([increment_changes_epoch/0]). +-export([increment_changes_epoch/0, set_changes_epoch/1]). %% exported for upgrade purposes. -export([keep_sending_changes/8]). @@ -90,7 +90,7 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) } = Collector, LastSeq = pack_seqs(NewSeqs), MaintenanceMode = config:get("cloudant", "maintenance_mode"), - NewEpoch = get_changes_epoch() > erlang:get(changes_epoch), + NewEpoch = newer_epoch(erlang:get(changes_epoch), get_changes_epoch()), if Limit > Limit2, Feed == "longpoll"; MaintenanceMode == "true"; MaintenanceMode == "nolb"; NewEpoch -> Callback({stop, LastSeq, pending_count(Offset)}, AccOut); @@ -465,16 +465,28 @@ validate_start_seq(DbName, Seq) -> end. get_changes_epoch() -> - case application:get_env(fabric, changes_epoch) of - undefined -> - increment_changes_epoch(), - get_changes_epoch(); - {ok, Epoch} -> - Epoch - end. + mochiglobal:get(fabric_changes_epoch). increment_changes_epoch() -> - application:set_env(fabric, changes_epoch, os:timestamp()). + {MegaSecs, Secs, _MicroSecs} = os:timestamp(), + Epoch = MegaSecs * 1000000 + Secs, + set_changes_epoch(Epoch). + +set_changes_epoch(Epoch) when is_integer(Epoch) -> + Current = get_changes_epoch(), + set_changes_epoch(Current, Epoch). + +set_changes_epoch(undefined, To) -> + mochiglobal:put(fabric_changes_epoch, To); +set_changes_epoch(From, To) when is_integer(From), is_integer(To), To > From -> + mochiglobal:put(fabric_changes_epoch, To); +set_changes_epoch(_From, _To) -> + ignored. + +newer_epoch(undefined, _) -> + true; +newer_epoch(From, To) when is_integer(From), is_integer(To) -> + To > From. unpack_seqs_test() -> meck:new(mem3),