Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -773,9 +773,23 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartSt
"Another snapshot operation in progress [req=" + req + ", curr=" + curSnpOp + ']'));
}

// Let's keep the metrics on any node.
if (clusterSnpFut == null) {
clusterSnpFut = new ClusterSnapshotFuture(req.reqId, req.snpName, req.incremental() ? req.incrementIndex() : null);

if (req.incremental())
lastSeenIncSnpFut = clusterSnpFut;
else
lastSeenSnpFut = clusterSnpFut;

log.error("TEST | set lastSeenSnpFut on " + cctx.localNode().order());
}

SnapshotOperation snpOp = new SnapshotOperation(req,
new SnapshotFileTree(cctx.kernalContext(), req.snapshotName(), req.snapshotPath()));

log.error("TEST | initLocalSnapshotStartStage() on " + cctx.localNode().order());

curSnpOp = snpOp;

if (req.incremental())
Expand Down Expand Up @@ -1116,7 +1130,9 @@ private void processLocalSnapshotStartStageResult(UUID id, Map<UUID, SnapshotOpe

if (snpReq == null || !snpReq.requestId().equals(id)) {
synchronized (snpOpMux) {
if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) {
assert clusterSnpFut != null;

if (clusterSnpFut.rqId.equals(id)) {
if (cancelled) {
clusterSnpFut.onDone(new IgniteFutureCancelledCheckedException("Execution of snapshot tasks " +
"has been cancelled by external process [err=" + err + ", snpReq=" + snpReq + ']'));
Expand Down Expand Up @@ -1377,45 +1393,49 @@ private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, SnapshotOpera

incSnpId = null;

if (clusterSnpFut != null && endFail.isEmpty() && snpOp.error() == null)
assert clusterSnpFut != null;

if (endFail.isEmpty() && snpOp.error() == null)
warnAtomicCachesInIncrementalSnapshot(snpReq.snapshotName(), snpReq.incrementIndex(), snpReq.groups());
}

curSnpOp = null;

synchronized (snpOpMux) {
if (clusterSnpFut != null) {
if (endFail.isEmpty() && snpOp.error() == null) {
if (!F.isEmpty(snpOp.warnings())) {
String wrnsLst = U.nl() + "\t- " + String.join(U.nl() + "\t- ", snpOp.warnings());
assert clusterSnpFut != null;

SnapshotWarningException wrn = new SnapshotWarningException(
"Snapshot create operation completed with warnings [name=" + snpReq.snapshotName() +
(snpReq.requestId() != null ? ", id=" + snpReq.requestId() : "") + "]:" + wrnsLst);
log.error("TEST | finishing clusterSnpFut on " + cctx.localNode().order());

clusterSnpFut.onDone(wrn);
if (endFail.isEmpty() && snpOp.error() == null) {
if (!F.isEmpty(snpOp.warnings())) {
String wrnsLst = U.nl() + "\t- " + String.join(U.nl() + "\t- ", snpOp.warnings());

log.warning(SNAPSHOT_FINISHED_WRN_MSG + snpReq + ". Warnings:" + wrnsLst);
}
else {
clusterSnpFut.onDone();
SnapshotWarningException wrn = new SnapshotWarningException(
"Snapshot create operation completed with warnings [name=" + snpReq.snapshotName() +
(snpReq.requestId() != null ? ", id=" + snpReq.requestId() : "") + "]:" + wrnsLst);

if (log.isInfoEnabled())
log.info(SNAPSHOT_FINISHED_MSG + snpReq);
}
clusterSnpFut.onDone(wrn);

log.warning(SNAPSHOT_FINISHED_WRN_MSG + snpReq + ". Warnings:" + wrnsLst);
}
else if (snpOp.error() == null) {
log.warning("Snapshot error: ", snpOp.error());
else {
clusterSnpFut.onDone();

clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " +
"Local snapshot tasks may not finished completely or finalizing results fails " +
"[fail=" + endFail + ", err=" + err + ']'));
if (log.isInfoEnabled())
log.info(SNAPSHOT_FINISHED_MSG + snpReq);
}
else
clusterSnpFut.onDone(snpOp.error());
}
else if (snpOp.error() == null) {
log.warning("Snapshot error: ", snpOp.error());

clusterSnpFut = null;
clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " +
"Local snapshot tasks may not finished completely or finalizing results fails " +
"[fail=" + endFail + ", err=" + err + ']'));
}
else
clusterSnpFut.onDone(snpOp.error());

clusterSnpFut = null;
}
}

Expand All @@ -1427,7 +1447,7 @@ public boolean isSnapshotCreating() {
return true;

synchronized (snpOpMux) {
return curSnpOp != null || clusterSnpFut != null;
return clusterSnpFut != null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,81 +849,70 @@ public void testClusterSnapshotWithExplicitPathError() throws Exception {
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotMetrics() throws Exception {
String newSnapshotName = SNAPSHOT_NAME + "_new";
CountDownLatch deltaApply = new CountDownLatch(1);
CountDownLatch deltaBlock = new CountDownLatch(1);
IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);

MetricRegistry mreg0 = ignite.context().metric().registry(SNAPSHOT_METRICS);

LongMetric startTime = mreg0.findMetric("LastSnapshotStartTime");
LongMetric endTime = mreg0.findMetric("LastSnapshotEndTime");
ObjectGauge<String> snpName = mreg0.findMetric("LastSnapshotName");
ObjectGauge<String> errMsg = mreg0.findMetric("LastSnapshotErrorMessage");
ObjectGauge<List<String>> snpList = mreg0.findMetric("LocalSnapshotNames");

// Snapshot process will be blocked when delta partition files processing starts.
snp(ignite).localSnapshotSenderFactory(
blockingLocalSnapshotSender(ignite, deltaApply, deltaBlock));

assertEquals("Snapshot start time must be undefined prior to snapshot operation started.",
0, startTime.value());
assertEquals("Snapshot end time must be undefined to snapshot operation started.",
0, endTime.value());
assertTrue("Snapshot name must not exist prior to snapshot operation started.", snpName.value().isEmpty());
assertTrue("Snapshot error message must null prior to snapshot operation started.", errMsg.value().isEmpty());
assertTrue("Snapshots on local node must not exist", snpList.value().isEmpty());
for (Ignite g : G.allGrids()) {
MetricRegistry mreg = ((IgniteEx)g).context().metric().registry(SNAPSHOT_METRICS);

LongMetric startTime = mreg.findMetric("LastSnapshotStartTime");
LongMetric endTime = mreg.findMetric("LastSnapshotEndTime");
ObjectGauge<String> snpName = mreg.findMetric("LastSnapshotName");
ObjectGauge<String> errMsg = mreg.findMetric("LastSnapshotErrorMessage");
ObjectGauge<List<String>> snpList = mreg.findMetric("LocalSnapshotNames");

assertEquals("Snapshot start time must be undefined prior to snapshot operation started.",
0, startTime.value());
assertEquals("Snapshot end time must be undefined to snapshot operation started.",
0, endTime.value());
assertTrue("Snapshot name must not exist prior to snapshot operation started.", snpName.value().isEmpty());
assertTrue("Snapshot error message must null prior to snapshot operation started.", errMsg.value().isEmpty());
assertTrue("Snapshots on local node must not exist", snpList.value().isEmpty());
}

long cutoffStartTime = U.currentTimeMillis();

IgniteFuture<Void> fut0 = snp(ignite).createSnapshot(SNAPSHOT_NAME, null, false, onlyPrimary);
IgniteFuture<Void> fut = snp(ignite).createSnapshot(SNAPSHOT_NAME, null, false, onlyPrimary);

U.await(deltaApply);

assertTrue("Snapshot start time must be set prior to snapshot operation started " +
"[startTime=" + startTime.value() + ", cutoffTime=" + cutoffStartTime + ']',
startTime.value() >= cutoffStartTime);
assertEquals("Snapshot end time must be zero prior to snapshot operation started.",
0, endTime.value());
assertEquals("Snapshot name must be set prior to snapshot operation started.",
SNAPSHOT_NAME, snpName.value());
assertTrue("Snapshot error message must null prior to snapshot operation started.",
errMsg.value().isEmpty());

IgniteFuture<Void> fut1 = snp(grid(1)).createSnapshot(newSnapshotName, null, false, onlyPrimary);

assertThrowsWithCause((Callable<Object>)fut1::get, IgniteException.class);
for (Ignite g : G.allGrids()) {
MetricRegistry mreg = ((IgniteEx)g).context().metric().registry(SNAPSHOT_METRICS);

LongMetric startTime = mreg.findMetric("LastSnapshotStartTime");
LongMetric endTime = mreg.findMetric("LastSnapshotEndTime");
ObjectGauge<String> snpName = mreg.findMetric("LastSnapshotName");
ObjectGauge<String> errMsg = mreg.findMetric("LastSnapshotErrorMessage");

assertTrue("Snapshot start time must be set prior to snapshot operation started " +
"[startTime=" + startTime.value() + ", cutoffTime=" + cutoffStartTime + ']',
startTime.value() >= cutoffStartTime);
assertEquals("Snapshot end time must be zero prior to snapshot operation started.",
0, endTime.value());
assertEquals("Snapshot name must be set prior to snapshot operation started.",
SNAPSHOT_NAME, snpName.value());
assertTrue("Snapshot error message must null prior to snapshot operation started.",
errMsg.value().isEmpty());
}

MetricRegistry mreg1 = grid(1).context().metric().registry(SNAPSHOT_METRICS);
deltaBlock.countDown();

LongMetric startTime1 = mreg1.findMetric("LastSnapshotStartTime");
LongMetric endTime1 = mreg1.findMetric("LastSnapshotEndTime");
ObjectGauge<String> snpName1 = mreg1.findMetric("LastSnapshotName");
ObjectGauge<String> errMsg1 = mreg1.findMetric("LastSnapshotErrorMessage");
fut.get();

assertTrue("Snapshot start time must be greater than zero for finished snapshot.",
startTime1.value() > 0);
assertEquals("Snapshot end time must zero for failed on start snapshots.",
0, endTime1.value());
assertEquals("Snapshot name must be set when snapshot operation already finished.",
newSnapshotName, snpName1.value());
assertNotNull("Concurrent snapshot operation must failed.",
errMsg1.value());
for (Ignite g : G.allGrids()) {
MetricRegistry mreg = ((IgniteEx)g).context().metric().registry(SNAPSHOT_METRICS);

deltaBlock.countDown();
LongMetric startTime = mreg.findMetric("LastSnapshotStartTime");
LongMetric endTime = mreg.findMetric("LastSnapshotEndTime");

fut0.get();

assertTrue("Snapshot start time must be greater than zero for finished snapshot.",
startTime.value() > 0);
assertTrue("Snapshot end time must be greater than zero for finished snapshot.",
endTime.value() > 0);
assertEquals("Snapshot name must be set when snapshot operation already finished.",
SNAPSHOT_NAME, snpName.value());
assertTrue("Concurrent snapshot operation must finished successfully.",
errMsg.value().isEmpty());
assertEquals("Only the first snapshot must be created and stored on disk.",
Collections.singletonList(SNAPSHOT_NAME), snpList.value());
waitForCondition(() -> endTime.value() != 0L && startTime.value() != 0 && endTime.value() > startTime.value(),
getTestTimeout());
}
}

/** @throws Exception If fails. */
Expand Down