Skip to content
79 changes: 63 additions & 16 deletions Sources/Scribe/LogManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,18 @@ public struct LogConfiguration: Sendable {
public static var `default`: LogConfiguration { LogConfiguration() }
}

// MARK: - SinkID
// MARK: - LogSubscription

/// Unique identifier for a registered log sink, used for removal.
public struct SinkID: Hashable, Sendable {
/// Unique identifier for a registered log subscription.
///
/// Used for both sink callbacks and async stream listeners.
public struct LogSubscription: Hashable, Sendable {
fileprivate let id: UUID
fileprivate let categories: Set<LogCategory>?

fileprivate init() {
fileprivate init(categories: Set<LogCategory>?) {
id = UUID()
self.categories = categories
}
}

Expand Down Expand Up @@ -153,7 +157,8 @@ public final class LogManager: @unchecked Sendable {
// State protected by logQueue
private var _minimumLevel: LogLevel
private var _configuration: LogConfiguration
private var sinks: [(id: SinkID, handler: @Sendable (String) -> ())] = []
private var sinks: [LogSubscription: @Sendable (String) -> ()] = [:]
private var streamContinuations: [LogSubscription: AsyncStream<String>.Continuation] = [:]

// Serial queue for thread-safe state access
private let logQueue: DispatchQueue
Expand Down Expand Up @@ -242,23 +247,31 @@ public final class LogManager: @unchecked Sendable {
///
/// Sinks are useful for capturing logs in tests, writing to files, or forwarding to remote services.
///
/// - Parameter sink: Closure called with each formatted log line.
/// - Returns: A `SinkID` that can be used to remove this specific sink later.
/// - Parameters:
/// - categories: Specific categories of log messages to receive. If `nil`, the callback is triggered for all
/// formatted messages.
/// - callback: Closure called with each formatted log line.
/// - Returns: A `LogSubscription` that can be used to remove this specific sink later.
@discardableResult
public func addSink(_ sink: @Sendable @escaping (String) -> ()) -> SinkID {
let sinkID = SinkID()
public func addSink(
categories: Set<LogCategory>? = nil,
_ callback: @Sendable @escaping (String) -> ()
) -> LogSubscription {
let subscription = LogSubscription(categories: categories)

logQueue.async {
self.sinks.append((id: sinkID, handler: sink))
self.sinks[subscription] = callback
}
return sinkID

return subscription
}

/// Removes a specific sink by its identifier.
///
/// - Parameter id: The `SinkID` returned when the sink was added.
public func removeSink(_ id: SinkID) {
public func removeSink(_ id: LogSubscription) {
logQueue.async {
self.sinks.removeAll { $0.id == id }
self.sinks[id] = nil
}
}

Expand All @@ -274,6 +287,31 @@ public final class LogManager: @unchecked Sendable {
logQueue.sync { sinks.count }
}

// MARK: - Streaming

/// Creates and returns a stream to recieve formatted log messages.
///
/// Similar to sinks, these are useful for capturing logs in tests, writing to files, or forwarding to remote
/// services.
/// - Parameter categories: Specific categories of log messages to receive. If `nil`, the continuation is triggered
/// for all formatted messages.
/// - Returns: An AsyncStream that will stream formatted log messages.
public func stream(categories: Set<LogCategory>? = nil) -> AsyncStream<String> {
AsyncStream { continuation in
let id = LogSubscription(categories: categories)

logQueue.async {
self.streamContinuations[id] = continuation
}

continuation.onTermination = { @Sendable _ in
self.logQueue.async {
self.streamContinuations.removeValue(forKey: id)
}
}
}
}

/// The number of cached `Logger` instances keyed by category.
public var loggerCacheCount: Int {
logQueue.sync { loggersByCategory.count + autoLoggerKeys.count }
Expand Down Expand Up @@ -367,9 +405,18 @@ public final class LogManager: @unchecked Sendable {
logger.log("\(formatted, privacy: .public)")
}

for (_, handler) in self.sinks {
handler(formatted)
}
self.dispatch(formatted, category: category)
}
}

private func dispatch(_ message: String, category: LogCategory) {
for (subscription, callback) in sinks where subscription.categories?.contains(category) ?? true {
callback(message)
}

for (subscription, continuation) in streamContinuations
where subscription.categories?.contains(category) ?? true {
continuation.yield(message)
}
}

Expand Down
92 changes: 92 additions & 0 deletions Tests/Scribe/ScribeTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,98 @@ final class ScribeTests: XCTestCase {
XCTAssertTrue(capture.message?.contains("Sink test message") ?? false)
}

func testSinksWithCategoryFilter() throws {
let allowedExpectation = XCTestExpectation(description: "Allowed category logged")
let blockedExpectation = XCTestExpectation(description: "Blocked category not logged")
blockedExpectation.isInverted = true

final class MessageCapture: @unchecked Sendable {
var message: String?
}
let capture = MessageCapture()

Log.logger.addSink(categories: [.testAllowed]) { message in
capture.message = message

if message.contains("[AllowedCategory]") {
allowedExpectation.fulfill()
}
if message.contains("[BlockedCategory]") {
blockedExpectation.fulfill()
}
}

Log.info("Sink test message allowed", category: .testAllowed)
Log.info("Sink test message blocked", category: .testBlocked)

wait(for: [allowedExpectation, blockedExpectation], timeout: 2.0)
XCTAssertNotNil(capture.message)
XCTAssertTrue(capture.message?.contains("Sink test message allowed") ?? false)
}

func testStreams() throws {
let expectation = XCTestExpectation(description: "Sink received log")

final class MessageCapture: @unchecked Sendable {
var message: String?
}
let capture = MessageCapture()

Task {
for await message in Log.logger.stream() {
capture.message = message
expectation.fulfill()
}
}

Task {
// Wait for the stream to start
try? await Task.sleep(for: .milliseconds(500))

Log.info("Sink test message")
}

wait(for: [expectation], timeout: 2.0)
XCTAssertNotNil(capture.message)
XCTAssertTrue(capture.message?.contains("Sink test message") ?? false)
}

func testStreamsWithCategoryFilter() throws {
let allowedExpectation = XCTestExpectation(description: "Allowed category logged")
let blockedExpectation = XCTestExpectation(description: "Blocked category not logged")
blockedExpectation.isInverted = true

final class MessageCapture: @unchecked Sendable {
var message: String?
}
let capture = MessageCapture()

Task {
for await message in Log.logger.stream(categories: [.testAllowed]) {
capture.message = message

if message.contains("[AllowedCategory]") {
allowedExpectation.fulfill()
}
if message.contains("[BlockedCategory]") {
blockedExpectation.fulfill()
}
}
}

Task {
// Wait for the stream to start
try? await Task.sleep(for: .milliseconds(500))

Log.info("Sink test message allowed", category: .testAllowed)
Log.info("Sink test message blocked", category: .testBlocked)
}

wait(for: [allowedExpectation, blockedExpectation], timeout: 2.0)
XCTAssertNotNil(capture.message)
XCTAssertTrue(capture.message?.contains("Sink test message allowed") ?? false)
}

func testCategoryFiltering() throws {
let allowedExpectation = XCTestExpectation(description: "Allowed category logged")
let blockedExpectation = XCTestExpectation(description: "Blocked category not logged")
Expand Down