diff --git a/src/main/java/com/techfork/domain/source/batch/RssFeedReader.java b/src/main/java/com/techfork/domain/source/batch/RssFeedReader.java index 9cb95cfa..06352705 100644 --- a/src/main/java/com/techfork/domain/source/batch/RssFeedReader.java +++ b/src/main/java/com/techfork/domain/source/batch/RssFeedReader.java @@ -12,37 +12,81 @@ import com.techfork.domain.source.entity.TechBlog; import com.techfork.domain.source.repository.TechBlogRepository; import com.techfork.global.util.ContentCleaner; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ItemReader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.client.WebClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Date; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; -@Slf4j @Component @StepScope -@RequiredArgsConstructor +@Slf4j public class RssFeedReader implements ItemReader { + private static final int RSS_FETCH_TASK_TIMEOUT_SECONDS = 45; + private final TechBlogRepository techBlogRepository; private final PostRepository postRepository; private final WebClient webClient; + @Qualifier("rssFetchTaskExecutor") + private final AsyncTaskExecutor rssFetchTaskExecutor; + private final int rssFetchTaskTimeoutSeconds; private List items; private int currentIndex = 0; + @Autowired + public RssFeedReader( + TechBlogRepository techBlogRepository, + PostRepository postRepository, + WebClient webClient, + @Qualifier("rssFetchTaskExecutor") AsyncTaskExecutor rssFetchTaskExecutor + ) { + this( + techBlogRepository, + postRepository, + webClient, + rssFetchTaskExecutor, + RSS_FETCH_TASK_TIMEOUT_SECONDS + ); + } + + RssFeedReader( + TechBlogRepository techBlogRepository, + PostRepository postRepository, + WebClient webClient, + @Qualifier("rssFetchTaskExecutor") AsyncTaskExecutor rssFetchTaskExecutor, + int rssFetchTaskTimeoutSeconds + ) { + this.techBlogRepository = techBlogRepository; + this.postRepository = postRepository; + this.webClient = webClient; + this.rssFetchTaskExecutor = rssFetchTaskExecutor; + this.rssFetchTaskTimeoutSeconds = rssFetchTaskTimeoutSeconds; + } + @Override public RssFeedItem read() { if (items == null) { @@ -61,30 +105,69 @@ private void initializeItems() { List techBlogs = techBlogRepository.findAll(); log.info("총 {}개 테크 블로그 RSS 수집 시작", techBlogs.size()); - List allItems = techBlogs.parallelStream() - .flatMap(techBlog -> { - try { - List feedItems = fetchRssFeed(techBlog); - log.info("[{}] RSS 수집 성공: {}개", techBlog.getCompanyName(), feedItems.size()); - return feedItems.stream(); - } catch (Exception e) { - log.error("[{}] RSS 수집 실패: {}", techBlog.getCompanyName(), e.getMessage()); - return Stream.empty(); - } - }) + List fetchTasks = techBlogs.stream() + .map(this::submitFetchTask) + .toList(); + + List allItems = fetchTasks.stream() + .flatMap(this::collectFeedItems) .toList(); Set existingUrls = postRepository.findExistingUrls( allItems.stream().map(RssFeedItem::url).toList() ); - items = allItems.stream() - .filter(item -> !existingUrls.contains(item.url())) - .toList(); + Map uniqueItemsByUrl = new LinkedHashMap<>(); + for (RssFeedItem item : allItems) { + if (!existingUrls.contains(item.url())) { + uniqueItemsByUrl.putIfAbsent(item.url(), item); + } + } + + items = List.copyOf(uniqueItemsByUrl.values()); log.info("RSS 수집 초기화 완료: 총 {}개 아이템", items.size()); } + private FeedFetchTask submitFetchTask(TechBlog techBlog) { + Future> future = rssFetchTaskExecutor.submit(() -> fetchFeedSafely(techBlog)); + return new FeedFetchTask(techBlog, future); + } + + private List fetchFeedSafely(TechBlog techBlog) { + try { + List feedItems = fetchRssFeed(techBlog); + log.info("[{}] RSS 수집 성공: {}개", techBlog.getCompanyName(), feedItems.size()); + return feedItems; + } catch (Exception e) { + log.error("[{}] RSS 수집 실패: {}", techBlog.getCompanyName(), e.getMessage()); + return List.of(); + } + } + + private Stream collectFeedItems(FeedFetchTask fetchTask) { + try { + return fetchTask.future() + .get(rssFetchTaskTimeoutSeconds, TimeUnit.SECONDS) + .stream(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("[{}] RSS 수집 대기 중 인터럽트 발생", fetchTask.techBlog().getCompanyName(), e); + return Stream.empty(); + } catch (TimeoutException e) { + boolean cancelled = fetchTask.future().cancel(true); + log.error("[{}] RSS 수집 타임아웃: {}초 (cancelled={})", + fetchTask.techBlog().getCompanyName(), + rssFetchTaskTimeoutSeconds, + cancelled); + return Stream.empty(); + } catch (ExecutionException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + log.error("[{}] RSS 수집 Future 처리 실패: {}", fetchTask.techBlog().getCompanyName(), cause.getMessage()); + return Stream.empty(); + } + } + private List fetchRssFeed(TechBlog techBlog) throws Exception { // WebClient로 RSS 피드 다운로드 byte[] responseBytes = webClient.get() @@ -259,4 +342,7 @@ private String extractImageFromHtml(String htmlContent) { return null; } + + private record FeedFetchTask(TechBlog techBlog, Future> future) { + } } \ No newline at end of file diff --git a/src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java b/src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java index 20e905c6..ac002a72 100644 --- a/src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java +++ b/src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java @@ -22,11 +22,15 @@ import org.springframework.batch.item.ItemWriter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; /** * RSS 크롤링 Job 설정 @@ -162,6 +166,21 @@ public ItemWriter> asyncEmbeddingWriter() { return asyncItemWriter; } + @Bean(name = "rssFetchTaskExecutor") + public AsyncTaskExecutor rssFetchTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(4); + executor.setMaxPoolSize(8); + executor.setQueueCapacity(32); + executor.setThreadNamePrefix("rss-fetch-"); + executor.setTaskDecorator(new MdcTaskDecorator()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + @Bean public TaskExecutor summaryTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); diff --git a/src/test/java/com/techfork/domain/source/batch/RssFeedReaderTest.java b/src/test/java/com/techfork/domain/source/batch/RssFeedReaderTest.java new file mode 100644 index 00000000..b88d001c --- /dev/null +++ b/src/test/java/com/techfork/domain/source/batch/RssFeedReaderTest.java @@ -0,0 +1,207 @@ +package com.techfork.domain.source.batch; + +import com.techfork.domain.post.repository.PostRepository; +import com.techfork.domain.source.dto.RssFeedItem; +import com.techfork.domain.source.entity.TechBlog; +import com.techfork.domain.source.repository.TechBlogRepository; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFunction; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.BDDMockito.given; + +@ExtendWith(MockitoExtension.class) +class RssFeedReaderTest { + + @Mock + private TechBlogRepository techBlogRepository; + + @Mock + private PostRepository postRepository; + + private ThreadPoolTaskExecutor rssFetchTaskExecutor; + + @BeforeEach + void setUp() { + rssFetchTaskExecutor = new ThreadPoolTaskExecutor(); + rssFetchTaskExecutor.setCorePoolSize(2); + rssFetchTaskExecutor.setMaxPoolSize(2); + rssFetchTaskExecutor.setQueueCapacity(10); + rssFetchTaskExecutor.setThreadNamePrefix("rss-fetch-test-"); + rssFetchTaskExecutor.initialize(); + } + + @AfterEach + void tearDown() { + rssFetchTaskExecutor.shutdown(); + } + + @Test + @DisplayName("기존 URL을 제외하고 동일 crawl 내 중복 URL은 한 번만 반환한다") + void read_FiltersExistingUrlsAndDeduplicatesCollectedItems() { + TechBlog kakao = TechBlog.create("카카오", "https://kakao.example.com", "https://kakao.example.com/rss", null); + TechBlog naver = TechBlog.create("네이버", "https://naver.example.com", "https://naver.example.com/rss", null); + + given(techBlogRepository.findAll()).willReturn(List.of(kakao, naver)); + given(postRepository.findExistingUrls(anyList())).willReturn(Set.of("https://posts.example.com/existing")); + + WebClient webClient = createWebClient(Map.of( + kakao.getRssUrl(), successResponse(rssXml( + rssItem("카카오 1", "https://posts.example.com/1", "본문1"), + rssItem("카카오 existing", "https://posts.example.com/existing", "본문2") + )), + naver.getRssUrl(), successResponse(rssXml( + rssItem("네이버 duplicate", "https://posts.example.com/1", "본문3"), + rssItem("네이버 2", "https://posts.example.com/2", "본문4") + )) + )); + + RssFeedReader reader = new RssFeedReader( + techBlogRepository, + postRepository, + webClient, + rssFetchTaskExecutor, + 1 + ); + + List items = readAll(reader); + + assertThat(items) + .extracting(RssFeedItem::url) + .containsExactly("https://posts.example.com/1", "https://posts.example.com/2"); + } + + @Test + @DisplayName("일부 feed 실패가 있어도 성공한 feed 결과는 유지한다") + void read_KeepsSuccessfulItemsWhenOneFeedFails() { + TechBlog kakao = TechBlog.create("카카오", "https://kakao.example.com", "https://kakao.example.com/rss", null); + TechBlog naver = TechBlog.create("네이버", "https://naver.example.com", "https://naver.example.com/rss", null); + + given(techBlogRepository.findAll()).willReturn(List.of(kakao, naver)); + given(postRepository.findExistingUrls(anyList())).willReturn(Set.of()); + + WebClient webClient = createWebClient(Map.of( + kakao.getRssUrl(), successResponse(rssXml(rssItem("카카오 1", "https://posts.example.com/1", "본문1"))), + naver.getRssUrl(), Mono.error(new RuntimeException("rss fetch failed")) + )); + + RssFeedReader reader = new RssFeedReader( + techBlogRepository, + postRepository, + webClient, + rssFetchTaskExecutor, + 1 + ); + + List items = readAll(reader); + + assertThat(items).hasSize(1); + assertThat(items.get(0).url()).isEqualTo("https://posts.example.com/1"); + } + + @Test + @DisplayName("느린 feed는 timeout 처리되고 다른 feed 결과는 계속 반환한다") + void read_TimesOutSlowFeedAndKeepsOtherResults() { + TechBlog kakao = TechBlog.create("카카오", "https://kakao.example.com", "https://kakao.example.com/rss", null); + TechBlog naver = TechBlog.create("네이버", "https://naver.example.com", "https://naver.example.com/rss", null); + + given(techBlogRepository.findAll()).willReturn(List.of(kakao, naver)); + given(postRepository.findExistingUrls(anyList())).willReturn(Set.of()); + + WebClient webClient = createWebClient(Map.of( + kakao.getRssUrl(), successResponse(rssXml(rssItem("카카오 1", "https://posts.example.com/1", "본문1"))), + naver.getRssUrl(), Mono.delay(java.time.Duration.ofMillis(1500)) + .map(ignore -> ClientResponse.create(HttpStatus.OK) + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE) + .body(rssXml(rssItem("네이버 1", "https://posts.example.com/2", "본문2"))) + .build()) + )); + + RssFeedReader reader = new RssFeedReader( + techBlogRepository, + postRepository, + webClient, + rssFetchTaskExecutor, + 1 + ); + + List items = readAll(reader); + + assertThat(items).hasSize(1); + assertThat(items.get(0).url()).isEqualTo("https://posts.example.com/1"); + } + + private List readAll(RssFeedReader reader) { + List items = new ArrayList<>(); + RssFeedItem item; + while ((item = reader.read()) != null) { + items.add(item); + } + return items; + } + + private WebClient createWebClient(Map> responses) { + ExchangeFunction exchangeFunction = request -> { + Mono response = responses.get(request.url().toString()); + if (response == null) { + return Mono.error(new IllegalArgumentException("Unexpected request URL: " + request.url())); + } + return response; + }; + + return WebClient.builder() + .exchangeFunction(exchangeFunction) + .build(); + } + + private Mono successResponse(String body) { + return Mono.just(ClientResponse.create(HttpStatus.OK) + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE) + .body(body) + .build()); + } + + private String rssXml(String... items) { + return """ + + + + Test Feed + https://feed.example.com + Test Feed + %s + + + """.formatted(String.join(System.lineSeparator(), items)); + } + + private String rssItem(String title, String link, String description) { + return """ + + %s + %s + + Mon, 01 Jan 2024 00:00:00 GMT + + """.formatted(title, link, description); + } +} diff --git a/src/test/java/com/techfork/domain/source/config/RssCrawlingJobIntegrationTest.java b/src/test/java/com/techfork/domain/source/config/RssCrawlingJobIntegrationTest.java new file mode 100644 index 00000000..f175b528 --- /dev/null +++ b/src/test/java/com/techfork/domain/source/config/RssCrawlingJobIntegrationTest.java @@ -0,0 +1,97 @@ +package com.techfork.domain.source.config; + +import com.techfork.domain.post.entity.Post; +import com.techfork.domain.source.batch.PostBatchWriter; +import com.techfork.domain.source.batch.RssFeedReader; +import com.techfork.domain.source.batch.RssToPostProcessor; +import com.techfork.domain.source.dto.RssFeedItem; +import com.techfork.global.common.IntegrationTestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.batch.test.JobRepositoryTestUtils; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.batch.item.Chunk; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.bean.override.mockito.MockitoBean; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@SpringBatchTest +class RssCrawlingJobIntegrationTest extends IntegrationTestBase { + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Autowired + private JobRepositoryTestUtils jobRepositoryTestUtils; + + @Autowired + private Job rssCrawlingJob; + + @MockitoBean + private RssFeedReader rssFeedReader; + + @MockitoBean + private RssToPostProcessor rssToPostProcessor; + + @MockitoBean + private PostBatchWriter postBatchWriter; + + @BeforeEach + void setUp() { + jobLauncherTestUtils.setJob(rssCrawlingJob); + } + + @AfterEach + void tearDown() { + jobRepositoryTestUtils.removeJobExecutions(); + } + + @Test + @DisplayName("fetchAndSaveRssStep은 RSS item을 읽어 processor/writer로 전달한다") + void fetchAndSaveRssStep_WiresReaderProcessorWriter() throws Exception { + RssFeedItem item = new RssFeedItem( + "테스트 제목", + "https://posts.example.com/1", + "https://logo.example.com/logo.png", + null, + "본문", + "본문", + java.time.LocalDateTime.now(), + "카카오", + 1L + ); + Post post = mock(Post.class); + + given(rssFeedReader.read()).willReturn(item, (RssFeedItem) null); + given(rssToPostProcessor.process(item)).willReturn(post); + + JobExecution execution = jobLauncherTestUtils.launchStep("fetchAndSaveRssStep"); + + long deadline = System.currentTimeMillis() + 5_000; + while (execution.isRunning() && System.currentTimeMillis() < deadline) { + Thread.sleep(100); + } + + assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + + verify(rssFeedReader, times(2)).read(); + verify(rssToPostProcessor).process(item); + + ArgumentCaptor> chunkCaptor = ArgumentCaptor.forClass(Chunk.class); + verify(postBatchWriter).write(chunkCaptor.capture()); + assertThat(chunkCaptor.getValue().getItems()).hasSize(1); + assertThat(chunkCaptor.getValue().getItems().get(0)).isSameAs(post); + } +}