diff --git a/docs/contracts/rest-api-v1.md b/docs/contracts/rest-api-v1.md index 9586f3b..066539f 100644 --- a/docs/contracts/rest-api-v1.md +++ b/docs/contracts/rest-api-v1.md @@ -88,6 +88,7 @@ This document defines the MVP API contract groups, standards, and core request/r - Publishes content from `DRAFT` to `PUBLISHED` - Request fields (MVP): `userId` - Requires `playbackReady=true` +- Emits `content.published` event on success - Returns `409 CONTENT_NOT_READY` when playback is not ready - Returns `409 CONTENT_STATE_INVALID` when state is not `DRAFT` diff --git a/services/java/content-service/pom.xml b/services/java/content-service/pom.xml index ca8beb1..87206ce 100644 --- a/services/java/content-service/pom.xml +++ b/services/java/content-service/pom.xml @@ -40,6 +40,10 @@ postgresql runtime + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-test diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java b/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java index b7191dd..2dc07a4 100644 --- a/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java +++ b/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java @@ -4,6 +4,8 @@ import com.cloudmedia.content.api.content.dto.CreateContentRequest; import com.cloudmedia.content.api.content.dto.PlaybackResponse; import com.cloudmedia.content.api.content.dto.UpdateContentRequest; +import com.cloudmedia.content.events.ContentEventPublisher; +import com.cloudmedia.content.events.ContentPublishedPayload; import com.cloudmedia.content.error.ApiException; import com.cloudmedia.content.persistence.entity.ContentState; import com.cloudmedia.content.persistence.entity.ChannelEntity; @@ -29,13 +31,16 @@ public class ContentService { private final ChannelRepository channelRepository; private final ChannelMemberRepository channelMemberRepository; private final PolicyEvaluationClient policyEvaluationClient; + private final ContentEventPublisher contentEventPublisher; public ContentService(ContentRepository contentRepository, ChannelRepository channelRepository, - ChannelMemberRepository channelMemberRepository, PolicyEvaluationClient policyEvaluationClient) { + ChannelMemberRepository channelMemberRepository, PolicyEvaluationClient policyEvaluationClient, + ContentEventPublisher contentEventPublisher) { this.contentRepository = contentRepository; this.channelRepository = channelRepository; this.channelMemberRepository = channelMemberRepository; this.policyEvaluationClient = policyEvaluationClient; + this.contentEventPublisher = contentEventPublisher; } @Transactional @@ -99,8 +104,14 @@ public ContentResponse publish(String contentId, String userId) { content.setState(ContentState.PUBLISHED); content.setPublishedAt(now); content.setUpdatedAt(now); - - return toResponse(contentRepository.save(content)); + ContentEntity savedContent = contentRepository.save(content); + contentEventPublisher.publishContentPublished( + new ContentPublishedPayload(savedContent.getId(), savedContent.getChannel().getId(), + savedContent.getTitle(), savedContent.getDescription(), savedContent.getContentType().name(), + savedContent.getVisibility().name(), savedContent.getPublishedAt()), + null); + + return toResponse(savedContent); } @Transactional diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java new file mode 100644 index 0000000..b36d3bb --- /dev/null +++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java @@ -0,0 +1,7 @@ +package com.cloudmedia.content.events; + +import java.time.Instant; + +public record ContentEventEnvelope(String eventId, String eventType, int eventVersion, Instant occurredAt, + String producer, String entityType, String entityId, String traceId, Object payload) { +} diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java new file mode 100644 index 0000000..a0adfd3 --- /dev/null +++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java @@ -0,0 +1,6 @@ +package com.cloudmedia.content.events; + +public interface ContentEventPublisher { + + void publishContentPublished(ContentPublishedPayload payload, String traceId); +} diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java new file mode 100644 index 0000000..3a25933 --- /dev/null +++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java @@ -0,0 +1,26 @@ +package com.cloudmedia.content.events; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; + +@Configuration +@EnableConfigurationProperties(ContentEventsProperties.class) +public class ContentEventsConfiguration { + + @Bean + @ConditionalOnProperty(prefix = "cloudmedia.content.events", name = "enabled", havingValue = "true") + KafkaContentEventPublisher kafkaContentEventPublisher(KafkaTemplate kafkaTemplate, + ContentEventsProperties properties) { + return new KafkaContentEventPublisher(kafkaTemplate, properties); + } + + @Bean + @ConditionalOnMissingBean(ContentEventPublisher.class) + NoopContentEventPublisher noopContentEventPublisher() { + return new NoopContentEventPublisher(); + } +} diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java new file mode 100644 index 0000000..663d57d --- /dev/null +++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java @@ -0,0 +1,36 @@ +package com.cloudmedia.content.events; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "cloudmedia.content.events") +public class ContentEventsProperties { + + private boolean enabled; + + private final Topics topics = new Topics(); + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public Topics getTopics() { + return topics; + } + + public static class Topics { + + private String contentPublished = "cloudmedia.content.published"; + + public String getContentPublished() { + return contentPublished; + } + + public void setContentPublished(String contentPublished) { + this.contentPublished = contentPublished; + } + } +} diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java new file mode 100644 index 0000000..080aea4 --- /dev/null +++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java @@ -0,0 +1,7 @@ +package com.cloudmedia.content.events; + +import java.time.LocalDateTime; + +public record ContentPublishedPayload(String contentId, String channelId, String title, String description, + String contentType, String visibility, LocalDateTime publishedAt) { +} diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java new file mode 100644 index 0000000..56b44c4 --- /dev/null +++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java @@ -0,0 +1,34 @@ +package com.cloudmedia.content.events; + +import java.time.Instant; +import java.util.UUID; +import org.springframework.kafka.core.KafkaTemplate; + +public class KafkaContentEventPublisher implements ContentEventPublisher { + + private static final int EVENT_VERSION = 1; + + private final KafkaTemplate kafkaTemplate; + + private final ContentEventsProperties properties; + + public KafkaContentEventPublisher(KafkaTemplate kafkaTemplate, ContentEventsProperties properties) { + this.kafkaTemplate = kafkaTemplate; + this.properties = properties; + } + + @Override + public void publishContentPublished(ContentPublishedPayload payload, String traceId) { + ContentEventEnvelope envelope = new ContentEventEnvelope(UUID.randomUUID().toString(), "content.published", + EVENT_VERSION, Instant.now(), "content-service", "content", payload.contentId(), + resolveTraceId(traceId), payload); + kafkaTemplate.send(properties.getTopics().getContentPublished(), payload.contentId(), envelope); + } + + private String resolveTraceId(String traceId) { + if (traceId != null && !traceId.isBlank()) { + return traceId; + } + return "req_" + UUID.randomUUID(); + } +} diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java new file mode 100644 index 0000000..cdac8cd --- /dev/null +++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java @@ -0,0 +1,9 @@ +package com.cloudmedia.content.events; + +public class NoopContentEventPublisher implements ContentEventPublisher { + + @Override + public void publishContentPublished(ContentPublishedPayload payload, String traceId) { + // intentionally no-op when content event publishing is disabled + } +} diff --git a/services/java/content-service/src/main/resources/application.yml b/services/java/content-service/src/main/resources/application.yml index fe60db8..05d4fc9 100644 --- a/services/java/content-service/src/main/resources/application.yml +++ b/services/java/content-service/src/main/resources/application.yml @@ -1,6 +1,11 @@ spring: application: name: content-service + kafka: + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer jpa: open-in-view: false @@ -10,3 +15,10 @@ server: policy: service: base-url: http://localhost:8084 + +cloudmedia: + content: + events: + enabled: false + topics: + content-published: cloudmedia.content.published diff --git a/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java b/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java index 972120a..78d8a55 100644 --- a/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java +++ b/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java @@ -2,6 +2,8 @@ import com.cloudmedia.content.api.content.dto.CreateContentRequest; import com.cloudmedia.content.api.content.dto.UpdateContentRequest; +import com.cloudmedia.content.events.ContentEventPublisher; +import com.cloudmedia.content.events.ContentPublishedPayload; import com.cloudmedia.content.error.ApiException; import com.cloudmedia.content.persistence.entity.ChannelEntity; import com.cloudmedia.content.persistence.entity.ChannelMemberEntity; @@ -31,6 +33,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @SpringBootTest @@ -52,6 +58,9 @@ class ContentServiceIntegrationTest { @MockBean private PolicyEvaluationClient policyEvaluationClient; + @MockBean + private ContentEventPublisher contentEventPublisher; + @Test void createDraftAppliesDefaultStateAndFlags() { ChannelEntity channel = saveChannel("channel-content-1", "content-channel-one"); @@ -143,6 +152,10 @@ void publishTransitionsDraftToPublishedWhenReady() { assertEquals(ContentState.PUBLISHED, published.state()); assertNotNull(published.publishedAt()); + verify(contentEventPublisher).publishContentPublished( + eq(new ContentPublishedPayload(content.getId(), channel.getId(), "Ready draft", "desc", + ContentType.VIDEO.name(), ContentVisibility.PRIVATE.name(), published.publishedAt())), + isNull()); } @Test @@ -156,6 +169,7 @@ void publishRejectsWhenPlaybackNotReady() { () -> contentService.publish(content.getId(), "publisher-2")); assertEquals("CONTENT_NOT_READY", exception.getCode()); + verify(contentEventPublisher, never()).publishContentPublished(any(), any()); } @Test @@ -169,6 +183,7 @@ void publishRejectsWhenStateIsNotDraft() { () -> contentService.publish(content.getId(), "publisher-3")); assertEquals("CONTENT_STATE_INVALID", exception.getCode()); + verify(contentEventPublisher, never()).publishContentPublished(any(), any()); } @Test diff --git a/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java b/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java new file mode 100644 index 0000000..2c6c265 --- /dev/null +++ b/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java @@ -0,0 +1,60 @@ +package com.cloudmedia.content.events; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class KafkaContentEventPublisherTest { + + @Test + void publishContentPublishedSendsEnvelopeToConfiguredTopic() { + MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), + new JsonSerializer<>()); + ProducerFactory producerFactory = new ProducerFactory<>() { + @Override + public org.apache.kafka.clients.producer.Producer createProducer() { + return mockProducer; + } + + @Override + public boolean transactionCapable() { + return false; + } + + @Override + public Map getConfigurationProperties() { + return new HashMap<>(); + } + }; + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory); + ContentEventsProperties properties = new ContentEventsProperties(); + properties.getTopics().setContentPublished("cloudmedia.content.published"); + KafkaContentEventPublisher publisher = new KafkaContentEventPublisher(kafkaTemplate, properties); + + publisher.publishContentPublished(new ContentPublishedPayload("cnt_1", "chn_1", "Title", "Description", "VIDEO", + "PUBLIC", LocalDateTime.parse("2026-03-14T12:00:00")), "req_123"); + + ProducerRecord record = mockProducer.history().getFirst(); + assertEquals("cloudmedia.content.published", record.topic()); + assertEquals("cnt_1", record.key()); + ContentEventEnvelope envelope = (ContentEventEnvelope) record.value(); + assertEquals("content.published", envelope.eventType()); + assertEquals(1, envelope.eventVersion()); + assertEquals("content-service", envelope.producer()); + assertEquals("content", envelope.entityType()); + assertEquals("cnt_1", envelope.entityId()); + assertEquals("req_123", envelope.traceId()); + assertNotNull(envelope.eventId()); + assertNotNull(envelope.occurredAt()); + } +}