diff --git a/src/main/java/com/todaysound/todaysound_server/domain/alarm/controller/InternalAlertController.java b/src/main/java/com/todaysound/todaysound_server/domain/alarm/controller/InternalAlertController.java index a4de172..176b8ad 100644 --- a/src/main/java/com/todaysound/todaysound_server/domain/alarm/controller/InternalAlertController.java +++ b/src/main/java/com/todaysound/todaysound_server/domain/alarm/controller/InternalAlertController.java @@ -1,19 +1,20 @@ package com.todaysound.todaysound_server.domain.alarm.controller; +import static com.todaysound.todaysound_server.global.utils.LogMarkers.BUSINESS; +import static net.logstash.logback.argument.StructuredArguments.kv; + import com.fasterxml.jackson.annotation.JsonProperty; +import com.todaysound.todaysound_server.domain.alarm.entity.NotificationOutbox; +import com.todaysound.todaysound_server.domain.alarm.repository.NotificationOutboxRepository; import com.todaysound.todaysound_server.domain.subscription.entity.Subscription; import com.todaysound.todaysound_server.domain.subscription.repository.SubscriptionRepository; import com.todaysound.todaysound_server.domain.summary.entity.Summary; import com.todaysound.todaysound_server.domain.summary.repository.SummaryRepository; -import com.todaysound.todaysound_server.domain.user.entity.User; -import static com.todaysound.todaysound_server.global.utils.LogMarkers.BUSINESS; -import static net.logstash.logback.argument.StructuredArguments.kv; - -import com.todaysound.todaysound_server.global.application.FCMService; import com.todaysound.todaysound_server.global.exception.BaseException; import com.todaysound.todaysound_server.global.exception.CommonErrorCode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -21,9 +22,12 @@ /** * 크롤러용 알림 생성 엔드포인트 - *

- * POST /internal/alerts { "user_id": 10, "subscription_id": 1, "site_post_id": "12345", "title": "게시글 제목", "url": - * "https://...", "content_raw": "...원문...", "content_summary": "...요약...", "keyword_matched": true } + * + * [Transactional Outbox 패턴 적용] + * - 기존: FCM 전송 → DB 저장 (순서 역전 + 원자성 없음) + * - 개선: DB 저장(Summary + NotificationOutbox)을 단일 트랜잭션으로 묶은 후, + * NotificationOutboxScheduler 가 주기적으로 outbox 를 읽어 FCM 전송을 담당한다. + * - 보장: FCM 과 DB 저장 중 하나만 성공하는 중간 상태가 발생하지 않는다. */ @Slf4j @RestController @@ -33,9 +37,10 @@ public class InternalAlertController implements InternalAlertApi { private final SubscriptionRepository subscriptionRepository; private final SummaryRepository summaryRepository; - private final FCMService fcmService; + private final NotificationOutboxRepository outboxRepository; @PostMapping("/alerts") + @Transactional public void createAlert(@RequestBody InternalAlertRequest request) { log.info(BUSINESS, "크롤러 알림 수신 {} {} {}", kv("userId", request.userId()), @@ -45,40 +50,36 @@ public void createAlert(@RequestBody InternalAlertRequest request) { Subscription subscription = subscriptionRepository.findById(request.subscriptionId()) .orElseThrow(() -> BaseException.type(CommonErrorCode.ENTITY_NOT_FOUND)); - // 간단한 소유자 검증 (userId 가 다르면 거부) if (!subscription.getUser().getId().equals(request.userId())) { throw BaseException.type(CommonErrorCode.FORBIDDEN); } - // 알림이 활성화된 구독에 대해서만 푸시 전송 - if (subscription.isAlarmEnabled()) { - User user = subscription.getUser(); - String prefix = "[" + request.siteAlias + "]"; - - fcmService.sendNotificationToUser( - user, - prefix + request.title(), - request.contentSummary() - ); - } - - // sitePostId 를 해시 키로 사용 + // ① Summary 저장 (트랜잭션 안) Summary summary = Summary.create( request.sitePostId(), request.title(), request.contentSummary(), request.url(), request.publishedAt(), - request.keywordMatched, + request.keywordMatched(), subscription ); - summaryRepository.save(summary); + // ② 알림이 활성화된 경우 outbox 에 기록 (같은 트랜잭션) + // FCM 을 직접 호출하지 않고 "전송할 것"을 DB 에 기록한다. + // Summary 저장과 outbox 저장이 하나의 트랜잭션이므로 둘 다 커밋되거나 둘 다 롤백된다. + if (subscription.isAlarmEnabled()) { + String title = "[" + request.siteAlias() + "] " + request.title(); + NotificationOutbox outbox = NotificationOutbox.create( + subscription.getUser().getId(), title, request.contentSummary()); + outboxRepository.save(outbox); + } + log.info(BUSINESS, "크롤러 알림 처리 완료 {} {} {}", kv("subscriptionId", request.subscriptionId()), kv("sitePostId", request.sitePostId()), - kv("alarmSent", subscription.isAlarmEnabled())); + kv("alarmQueued", subscription.isAlarmEnabled())); } public record InternalAlertRequest( @@ -95,5 +96,3 @@ public record InternalAlertRequest( ) { } } - - diff --git a/src/main/java/com/todaysound/todaysound_server/domain/alarm/entity/NotificationOutbox.java b/src/main/java/com/todaysound/todaysound_server/domain/alarm/entity/NotificationOutbox.java new file mode 100644 index 0000000..f8c0896 --- /dev/null +++ b/src/main/java/com/todaysound/todaysound_server/domain/alarm/entity/NotificationOutbox.java @@ -0,0 +1,77 @@ +package com.todaysound.todaysound_server.domain.alarm.entity; + +import com.todaysound.todaysound_server.global.entity.BaseEntity; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; +import jakarta.persistence.Table; +import java.time.LocalDateTime; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * Transactional Outbox 패턴 구현체 + * + * FCM 알림을 직접 전송하는 대신 이 테이블에 "전송할 것"을 기록한다. + * Summary 저장과 같은 트랜잭션 내에서 저장되므로 둘 다 커밋되거나 둘 다 롤백된다. + * NotificationOutboxScheduler 가 주기적으로 PENDING 건을 읽어 FCM 전송을 수행한다. + */ +@Entity +@Getter +@Table(name = "notification_outbox") +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class NotificationOutbox extends BaseEntity { + + @Column(nullable = false) + private Long userId; + + @Column(nullable = false) + private String title; + + @Column(nullable = false, columnDefinition = "TEXT") + private String body; + + @Enumerated(EnumType.STRING) + @Column(nullable = false, length = 20) + private OutboxStatus status; + + @Column(nullable = false) + private int retryCount; + + @Column(nullable = false) + private LocalDateTime createdAt; + + public static NotificationOutbox create(Long userId, String title, String body) { + NotificationOutbox outbox = new NotificationOutbox(); + outbox.userId = userId; + outbox.title = title; + outbox.body = body; + outbox.status = OutboxStatus.PENDING; + outbox.retryCount = 0; + outbox.createdAt = LocalDateTime.now(); + return outbox; + } + + public void markAsSent() { + this.status = OutboxStatus.SENT; + } + + /** + * 탈퇴 사용자 등 재시도 없이 즉시 실패 처리할 때 사용한다. + */ + public void markAsFailed() { + this.status = OutboxStatus.FAILED; + } + + /** + * 재시도 횟수를 증가시키고, maxRetry 초과 시 FAILED 로 전환한다. + */ + public void incrementRetry(int maxRetry) { + this.retryCount++; + if (this.retryCount >= maxRetry) { + this.status = OutboxStatus.FAILED; + } + } +} diff --git a/src/main/java/com/todaysound/todaysound_server/domain/alarm/entity/OutboxStatus.java b/src/main/java/com/todaysound/todaysound_server/domain/alarm/entity/OutboxStatus.java new file mode 100644 index 0000000..b97de9e --- /dev/null +++ b/src/main/java/com/todaysound/todaysound_server/domain/alarm/entity/OutboxStatus.java @@ -0,0 +1,7 @@ +package com.todaysound.todaysound_server.domain.alarm.entity; + +public enum OutboxStatus { + PENDING, // 전송 대기 + SENT, // 전송 완료 + FAILED // 최대 재시도 횟수 초과 +} diff --git a/src/main/java/com/todaysound/todaysound_server/domain/alarm/repository/NotificationOutboxRepository.java b/src/main/java/com/todaysound/todaysound_server/domain/alarm/repository/NotificationOutboxRepository.java new file mode 100644 index 0000000..f18094d --- /dev/null +++ b/src/main/java/com/todaysound/todaysound_server/domain/alarm/repository/NotificationOutboxRepository.java @@ -0,0 +1,11 @@ +package com.todaysound.todaysound_server.domain.alarm.repository; + +import com.todaysound.todaysound_server.domain.alarm.entity.NotificationOutbox; +import com.todaysound.todaysound_server.domain.alarm.entity.OutboxStatus; +import java.util.List; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface NotificationOutboxRepository extends JpaRepository { + + List findByStatusOrderByCreatedAtAsc(OutboxStatus status); +} diff --git a/src/main/java/com/todaysound/todaysound_server/domain/alarm/scheduler/NotificationOutboxScheduler.java b/src/main/java/com/todaysound/todaysound_server/domain/alarm/scheduler/NotificationOutboxScheduler.java new file mode 100644 index 0000000..3e8216e --- /dev/null +++ b/src/main/java/com/todaysound/todaysound_server/domain/alarm/scheduler/NotificationOutboxScheduler.java @@ -0,0 +1,68 @@ +package com.todaysound.todaysound_server.domain.alarm.scheduler; + +import static com.todaysound.todaysound_server.global.utils.LogMarkers.BUSINESS; +import static net.logstash.logback.argument.StructuredArguments.kv; + +import com.todaysound.todaysound_server.domain.alarm.entity.NotificationOutbox; +import com.todaysound.todaysound_server.domain.alarm.entity.OutboxStatus; +import com.todaysound.todaysound_server.domain.alarm.repository.NotificationOutboxRepository; +import com.todaysound.todaysound_server.domain.alarm.service.NotificationOutboxProcessor; +import com.todaysound.todaysound_server.domain.user.entity.User; +import com.todaysound.todaysound_server.domain.user.repository.UserRepository; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * Transactional Outbox 패턴의 소비자(Consumer) 역할. + * + * notification_outbox 테이블에서 PENDING 상태인 항목을 주기적으로 읽어 + * NotificationOutboxProcessor 에 위임하여 FCM 전송을 수행한다. + * + * [처리 흐름] + * PENDING → (전송 성공) → SENT + * PENDING → (전송 실패 × MAX_RETRY) → FAILED + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class NotificationOutboxScheduler { + + private final NotificationOutboxRepository outboxRepository; + private final UserRepository userRepository; + private final NotificationOutboxProcessor outboxProcessor; + + /** + * 5초마다 PENDING 상태인 outbox 항목을 처리한다. + */ + @Scheduled(fixedDelay = 5000) + public void processOutbox() { + List pending = + outboxRepository.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING); + + if (pending.isEmpty()) { + return; + } + + log.info(BUSINESS, "Outbox 처리 시작 {}", kv("pendingCount", pending.size())); + + // N+1 방지: 필요한 User를 한 번의 쿼리로 일괄 조회 + Set userIds = pending.stream() + .map(NotificationOutbox::getUserId) + .collect(Collectors.toSet()); + + Map userMap = userRepository.findAllById(userIds).stream() + .collect(Collectors.toMap(User::getId, user -> user)); + + // 아이템별로 별도 트랜잭션을 열어 처리 (Processor 에 위임) + // 한 아이템의 실패가 다른 아이템에 영향을 주지 않는다 + for (NotificationOutbox outbox : pending) { + outboxProcessor.process(outbox.getId(), userMap.get(outbox.getUserId())); + } + } +} diff --git a/src/main/java/com/todaysound/todaysound_server/domain/alarm/service/NotificationOutboxProcessor.java b/src/main/java/com/todaysound/todaysound_server/domain/alarm/service/NotificationOutboxProcessor.java new file mode 100644 index 0000000..f859e64 --- /dev/null +++ b/src/main/java/com/todaysound/todaysound_server/domain/alarm/service/NotificationOutboxProcessor.java @@ -0,0 +1,73 @@ +package com.todaysound.todaysound_server.domain.alarm.service; + +import static com.todaysound.todaysound_server.global.utils.LogMarkers.BUSINESS; +import static net.logstash.logback.argument.StructuredArguments.kv; + +import com.todaysound.todaysound_server.domain.alarm.entity.NotificationOutbox; +import com.todaysound.todaysound_server.domain.alarm.entity.OutboxStatus; +import com.todaysound.todaysound_server.domain.alarm.repository.NotificationOutboxRepository; +import com.todaysound.todaysound_server.domain.user.entity.User; +import com.todaysound.todaysound_server.global.application.FCMService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** + * Outbox 항목 하나를 처리하는 단위 컴포넌트. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class NotificationOutboxProcessor { + + private static final int MAX_RETRY = 3; + + private final NotificationOutboxRepository outboxRepository; + private final FCMService fcmService; + + /** + * outbox 항목 하나를 처리한다. 스케줄러가 아이템별로 호출한다. + * + * @param outboxId 처리할 outbox의 PK + * @param user 스케줄러에서 batch 조회한 User (탈퇴 사용자인 경우 null) + */ + @Transactional + public void process(Long outboxId, User user) { + // detached 상태의 엔티티를 다시 로드해 managed 상태로 전환 + NotificationOutbox outbox = outboxRepository.findById(outboxId) + .orElseThrow(() -> new IllegalStateException("Outbox not found: " + outboxId)); + + // 탈퇴한 사용자: 재시도 없이 즉시 FAILED + if (user == null) { + log.warn(BUSINESS, "Outbox 전송 대상 사용자 없음 (탈퇴) {} {}", + kv("outboxId", outboxId), + kv("userId", outbox.getUserId())); + outbox.markAsFailed(); + return; + } + + try { + fcmService.sendNotificationToUser(user, outbox.getTitle(), outbox.getBody()); + outbox.markAsSent(); + + log.info(BUSINESS, "Outbox FCM 전송 성공 {} {}", + kv("outboxId", outboxId), + kv("userId", outbox.getUserId())); + + } catch (Exception e) { + log.warn(BUSINESS, "Outbox FCM 전송 실패 {} {} {}", + kv("outboxId", outboxId), + kv("retryCount", outbox.getRetryCount()), + kv("error", e.getMessage())); + + outbox.incrementRetry(MAX_RETRY); + + if (outbox.getStatus() == OutboxStatus.FAILED) { + log.error(BUSINESS, "Outbox 최대 재시도 초과, FAILED 처리 {} {}", + kv("outboxId", outboxId), + kv("userId", outbox.getUserId())); + } + } + } +} diff --git a/src/main/resources/db/migration/V5__create_notification_outbox.sql b/src/main/resources/db/migration/V5__create_notification_outbox.sql new file mode 100644 index 0000000..19a53e8 --- /dev/null +++ b/src/main/resources/db/migration/V5__create_notification_outbox.sql @@ -0,0 +1,16 @@ +-- notification_outbox: Transactional Outbox 테이블 생성 +CREATE TABLE notification_outbox +( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + user_id BIGINT NOT NULL, + title VARCHAR(255) NOT NULL, + body TEXT NOT NULL, + status VARCHAR(20) NOT NULL, + retry_count INT NOT NULL, + created_at DATETIME(6) NOT NULL +) ENGINE=InnoDB + DEFAULT CHARSET = utf8mb4; + +-- 조회 최적화를 위한 인덱스 (PENDING + created_at 순) +CREATE INDEX idx_notification_outbox_status_created_at + ON notification_outbox (status, created_at); \ No newline at end of file diff --git a/src/test/java/com/todaysound/todaysound_server/support/isolation/TableNameExtractorImpl.java b/src/test/java/com/todaysound/todaysound_server/support/isolation/TableNameExtractorImpl.java new file mode 100644 index 0000000..7dcd0ed --- /dev/null +++ b/src/test/java/com/todaysound/todaysound_server/support/isolation/TableNameExtractorImpl.java @@ -0,0 +1,22 @@ +package com.todaysound.todaysound_server.support.isolation; + +import java.util.List; +import org.springframework.stereotype.Component; + +@Component +class TableNameExtractorImpl implements TableNameExtractor { + + @Override + public List getNames() { + return List.of( + "keywords", + "urls", + "users", + "fcm_tokens", + "subscriptions", + "subscriptions_keywords", + "summaries", + "notification_outbox" + ); + } +} \ No newline at end of file