Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ jobs:

- name: Run Trivy container scan
id: trivy_scan
uses: aquasecurity/trivy-action@0.26.0
uses: aquasecurity/trivy-action@0.35.0
if: ${{ github.repository == 'apache/pulsar' && github.event_name != 'pull_request' }}
continue-on-error: true
with:
Expand Down
3 changes: 3 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,9 @@ Protocol Buffers License
CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- com.sun.activation-jakarta.activation-1.2.2.jar
- javax.activation-javax.activation-api-1.2.0.jar
* Java Architecture for XML Binding (JAXB) API
- javax.xml.bind-jaxb-api-2.3.1.jar
* Java Servlet API -- javax.servlet-javax.servlet-api-3.1.0.jar
* WebSocket Server API -- javax.websocket-javax.websocket-client-api-1.0.jar
* HK2 - Dependency Injection Kernel
Expand Down
3 changes: 3 additions & 0 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ MIT License
CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- jakarta.activation-1.2.2.jar
- javax.activation-api-1.2.0.jar
* Java Architecture for XML Binding (JAXB) API
- jaxb-api-2.3.1.jar
* WebSocket Server API -- javax.websocket-client-api-1.0.jar
* HK2 - Dependency Injection Kernel
- hk2-api-2.6.1.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,4 +770,8 @@ default long getLastAddEntryTime() {
default long getMetadataCreationTimestamp() {
return 0;
}

default void addLedgerEventListener(ManagedLedgerEventListener listener) {
// No-op by default
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.Value;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;

public interface ManagedLedgerEventListener {
enum LedgerRollReason {
FULL, // Ledger is full based on size or time
INACTIVE, // No writes for a while
APPEND_FAIL, ConcurrentModification, // Failed to append to the ledger
}

@AllArgsConstructor
@NoArgsConstructor(force = true)
@Builder
@Value
class LedgerRollEvent {
long ledgerId;
LedgerRollReason reason;
}

void onLedgerRoll(LedgerRollEvent event);

void onLedgerDelete(LedgerInfo... ledgerInfos);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -102,6 +103,9 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerAttributes;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerEventListener;
import org.apache.bookkeeper.mledger.ManagedLedgerEventListener.LedgerRollEvent;
import org.apache.bookkeeper.mledger.ManagedLedgerEventListener.LedgerRollReason;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundException;
Expand Down Expand Up @@ -1800,7 +1804,7 @@ synchronized void addEntryFailedDueToConcurrentlyModified(final LedgerHandle cur
+ " The last add confirmed position in memory is {}, and the value"
+ " stored in metadata store is {}.", name, lh.getId(), currentLedger.getLastAddConfirmed(),
lh.getLastAddConfirmed());
ledgerClosed(currentLedger, lh.getLastAddConfirmed());
ledgerClosed(currentLedger, lh.getLastAddConfirmed(), LedgerRollReason.ConcurrentModification);
} else {
log.error("[{}] Fencing the topic to ensure durability and consistency(the current ledger was"
+ " concurrent modified by a other bookie client, which is not expected)."
Expand All @@ -1815,14 +1819,15 @@ synchronized void addEntryFailedDueToConcurrentlyModified(final LedgerHandle cur
}, null, true);
}

synchronized void ledgerClosed(final LedgerHandle lh) {
ledgerClosed(lh, null);
@VisibleForTesting
public synchronized void ledgerClosedWithReason(final LedgerHandle lh, LedgerRollReason ledgerRollReason) {
ledgerClosed(lh, null, ledgerRollReason);
}

// //////////////////////////////////////////////////////////////////////
// Private helpers

synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) {
synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed, LedgerRollReason ledgerRollReason) {
final State state = STATE_UPDATER.get(this);
LedgerHandle currentLedger = this.currentLedger;
if (currentLedger == lh && (state == State.ClosingLedger || state == State.LedgerOpened)) {
Expand Down Expand Up @@ -1855,7 +1860,7 @@ synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) {

maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);

createLedgerAfterClosed();
createLedgerAfterClosed(ledgerRollReason);
}

@Override
Expand All @@ -1865,12 +1870,17 @@ public void skipNonRecoverableLedger(long ledgerId){
}
}

synchronized void createLedgerAfterClosed() {
@VisibleForTesting
public synchronized void createLedgerAfterClosed(LedgerRollReason ledgerRollReason) {
if (isNeededCreateNewLedgerAfterCloseLedger()) {
log.info("[{}] Creating a new ledger after closed {}", name,
currentLedger == null ? "null" : currentLedger.getId());
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
notifyRollLedgerEvent(LedgerRollEvent.builder()
.ledgerId(currentLedger == null ? -1 : currentLedger.getId())
.reason(ledgerRollReason)
.build());
mbean.startDataLedgerCreateOp();
// Use the executor here is to avoid use the Zookeeper thread to create the ledger which will lead
// to deadlock at the zookeeper client, details to see https://github.com/apache/pulsar/issues/13736
Expand Down Expand Up @@ -1909,7 +1919,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) {
name, lh.getId(), BKException.getMessage(rc));
}

ledgerClosed(lh);
ledgerClosedWithReason(lh, LedgerRollReason.FULL);
}
}, null);
}
Expand Down Expand Up @@ -3045,10 +3055,13 @@ public void operationComplete(Void result, Stat stat) {
metadataMutex.unlock();
trimmerMutex.unlock();

notifyDeleteLedgerEvent(ledgersToDelete.toArray(new LedgerInfo[0]));
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
asyncDeleteLedger(ls.getLedgerId(), ls);
}

notifyDeleteLedgerEvent(offloadedLedgersToDelete.toArray(new LedgerInfo[0]));
for (LedgerInfo ls : offloadedLedgersToDelete) {
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
ls.getSize());
Expand Down Expand Up @@ -4719,14 +4732,15 @@ public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
}, null);
futures.add(future);
}
CompletableFuture<Void> future = new CompletableFuture();
CompletableFuture<List<LedgerInfo>> future = new CompletableFuture();
FutureUtil.waitForAll(futures).thenAccept(p -> {
internalTrimLedgers(true, future);
}).exceptionally(e -> {
future.completeExceptionally(e);
return null;
});
return future;
return future.thenRun(() -> {
});
}

@Override
Expand Down Expand Up @@ -4907,7 +4921,7 @@ public boolean checkInactiveLedgerAndRollOver() {
name, lh.getId(), BKException.getMessage(rc));
}

ledgerClosed(lh);
ledgerClosedWithReason(lh, LedgerRollReason.INACTIVE);
// we do not create ledger here, since topic is inactive for a long time.
}, null);
return true;
Expand Down Expand Up @@ -4983,4 +4997,32 @@ public long getLastAddEntryTime() {
public long getMetadataCreationTimestamp() {
return ledgersStat != null ? ledgersStat.getCreationTimestamp() : 0;
}

private final List<ManagedLedgerEventListener> ledgerEventListeners = new CopyOnWriteArrayList<>();

@Override
public void addLedgerEventListener(ManagedLedgerEventListener listener) {
Objects.requireNonNull(listener);
ledgerEventListeners.add(listener);
}

private void notifyRollLedgerEvent(LedgerRollEvent event) {
for (ManagedLedgerEventListener listener : ledgerEventListeners) {
try {
listener.onLedgerRoll(event);
} catch (Exception e) {
log.warn("Exception in ledger rolled listener for ledger {}", event, e);
}
}
}

private void notifyDeleteLedgerEvent(LedgerInfo... ledgerInfos) {
for (ManagedLedgerEventListener listener : ledgerEventListeners) {
try {
listener.onLedgerDelete(ledgerInfos);
} catch (Exception e) {
log.warn("Exception in ledger delete listener", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerEventListener.LedgerRollReason;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
Expand Down Expand Up @@ -304,7 +305,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
}

ml.ledgerClosed(lh);
ml.ledgerClosedWithReason(lh, LedgerRollReason.FULL);
updateLatency();

AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
Expand Down Expand Up @@ -360,7 +361,7 @@ void handleAddFailure(final LedgerHandle lh, Integer rc) {
|| rc.intValue() == BKException.Code.LedgerFencedException)) {
finalMl.addEntryFailedDueToConcurrentlyModified(lh, rc);
} else {
finalMl.ledgerClosed(lh);
finalMl.ledgerClosedWithReason(lh, LedgerRollReason.APPEND_FAIL);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerEventListener.LedgerRollReason;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
Expand Down Expand Up @@ -5636,13 +5637,13 @@ public void testEstimateEntryCountBySize() throws Exception {
}
long ledger1 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
ml.ledgerClosedWithReason(ml.getCurrentLedger(), LedgerRollReason.FULL);
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2});
}
long ledger2 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
ml.ledgerClosedWithReason(ml.getCurrentLedger(), LedgerRollReason.FULL);
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2, 3, 4});
}
Expand Down
Loading
Loading