diff --git a/docs/contracts/rest-api-v1.md b/docs/contracts/rest-api-v1.md
index 9586f3b..066539f 100644
--- a/docs/contracts/rest-api-v1.md
+++ b/docs/contracts/rest-api-v1.md
@@ -88,6 +88,7 @@ This document defines the MVP API contract groups, standards, and core request/r
- Publishes content from `DRAFT` to `PUBLISHED`
- Request fields (MVP): `userId`
- Requires `playbackReady=true`
+- Emits `content.published` event on success
- Returns `409 CONTENT_NOT_READY` when playback is not ready
- Returns `409 CONTENT_STATE_INVALID` when state is not `DRAFT`
diff --git a/services/java/content-service/pom.xml b/services/java/content-service/pom.xml
index ca8beb1..87206ce 100644
--- a/services/java/content-service/pom.xml
+++ b/services/java/content-service/pom.xml
@@ -40,6 +40,10 @@
postgresql
runtime
+
+ org.springframework.kafka
+ spring-kafka
+
org.springframework.boot
spring-boot-starter-test
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java b/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java
index b7191dd..2dc07a4 100644
--- a/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java
@@ -4,6 +4,8 @@
import com.cloudmedia.content.api.content.dto.CreateContentRequest;
import com.cloudmedia.content.api.content.dto.PlaybackResponse;
import com.cloudmedia.content.api.content.dto.UpdateContentRequest;
+import com.cloudmedia.content.events.ContentEventPublisher;
+import com.cloudmedia.content.events.ContentPublishedPayload;
import com.cloudmedia.content.error.ApiException;
import com.cloudmedia.content.persistence.entity.ContentState;
import com.cloudmedia.content.persistence.entity.ChannelEntity;
@@ -29,13 +31,16 @@ public class ContentService {
private final ChannelRepository channelRepository;
private final ChannelMemberRepository channelMemberRepository;
private final PolicyEvaluationClient policyEvaluationClient;
+ private final ContentEventPublisher contentEventPublisher;
public ContentService(ContentRepository contentRepository, ChannelRepository channelRepository,
- ChannelMemberRepository channelMemberRepository, PolicyEvaluationClient policyEvaluationClient) {
+ ChannelMemberRepository channelMemberRepository, PolicyEvaluationClient policyEvaluationClient,
+ ContentEventPublisher contentEventPublisher) {
this.contentRepository = contentRepository;
this.channelRepository = channelRepository;
this.channelMemberRepository = channelMemberRepository;
this.policyEvaluationClient = policyEvaluationClient;
+ this.contentEventPublisher = contentEventPublisher;
}
@Transactional
@@ -99,8 +104,14 @@ public ContentResponse publish(String contentId, String userId) {
content.setState(ContentState.PUBLISHED);
content.setPublishedAt(now);
content.setUpdatedAt(now);
-
- return toResponse(contentRepository.save(content));
+ ContentEntity savedContent = contentRepository.save(content);
+ contentEventPublisher.publishContentPublished(
+ new ContentPublishedPayload(savedContent.getId(), savedContent.getChannel().getId(),
+ savedContent.getTitle(), savedContent.getDescription(), savedContent.getContentType().name(),
+ savedContent.getVisibility().name(), savedContent.getPublishedAt()),
+ null);
+
+ return toResponse(savedContent);
}
@Transactional
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java
new file mode 100644
index 0000000..b36d3bb
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java
@@ -0,0 +1,7 @@
+package com.cloudmedia.content.events;
+
+import java.time.Instant;
+
+public record ContentEventEnvelope(String eventId, String eventType, int eventVersion, Instant occurredAt,
+ String producer, String entityType, String entityId, String traceId, Object payload) {
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java
new file mode 100644
index 0000000..a0adfd3
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java
@@ -0,0 +1,6 @@
+package com.cloudmedia.content.events;
+
+public interface ContentEventPublisher {
+
+ void publishContentPublished(ContentPublishedPayload payload, String traceId);
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java
new file mode 100644
index 0000000..3a25933
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java
@@ -0,0 +1,26 @@
+package com.cloudmedia.content.events;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
+
+@Configuration
+@EnableConfigurationProperties(ContentEventsProperties.class)
+public class ContentEventsConfiguration {
+
+ @Bean
+ @ConditionalOnProperty(prefix = "cloudmedia.content.events", name = "enabled", havingValue = "true")
+ KafkaContentEventPublisher kafkaContentEventPublisher(KafkaTemplate kafkaTemplate,
+ ContentEventsProperties properties) {
+ return new KafkaContentEventPublisher(kafkaTemplate, properties);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(ContentEventPublisher.class)
+ NoopContentEventPublisher noopContentEventPublisher() {
+ return new NoopContentEventPublisher();
+ }
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java
new file mode 100644
index 0000000..663d57d
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java
@@ -0,0 +1,36 @@
+package com.cloudmedia.content.events;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "cloudmedia.content.events")
+public class ContentEventsProperties {
+
+ private boolean enabled;
+
+ private final Topics topics = new Topics();
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public Topics getTopics() {
+ return topics;
+ }
+
+ public static class Topics {
+
+ private String contentPublished = "cloudmedia.content.published";
+
+ public String getContentPublished() {
+ return contentPublished;
+ }
+
+ public void setContentPublished(String contentPublished) {
+ this.contentPublished = contentPublished;
+ }
+ }
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java
new file mode 100644
index 0000000..080aea4
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java
@@ -0,0 +1,7 @@
+package com.cloudmedia.content.events;
+
+import java.time.LocalDateTime;
+
+public record ContentPublishedPayload(String contentId, String channelId, String title, String description,
+ String contentType, String visibility, LocalDateTime publishedAt) {
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java
new file mode 100644
index 0000000..56b44c4
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java
@@ -0,0 +1,34 @@
+package com.cloudmedia.content.events;
+
+import java.time.Instant;
+import java.util.UUID;
+import org.springframework.kafka.core.KafkaTemplate;
+
+public class KafkaContentEventPublisher implements ContentEventPublisher {
+
+ private static final int EVENT_VERSION = 1;
+
+ private final KafkaTemplate kafkaTemplate;
+
+ private final ContentEventsProperties properties;
+
+ public KafkaContentEventPublisher(KafkaTemplate kafkaTemplate, ContentEventsProperties properties) {
+ this.kafkaTemplate = kafkaTemplate;
+ this.properties = properties;
+ }
+
+ @Override
+ public void publishContentPublished(ContentPublishedPayload payload, String traceId) {
+ ContentEventEnvelope envelope = new ContentEventEnvelope(UUID.randomUUID().toString(), "content.published",
+ EVENT_VERSION, Instant.now(), "content-service", "content", payload.contentId(),
+ resolveTraceId(traceId), payload);
+ kafkaTemplate.send(properties.getTopics().getContentPublished(), payload.contentId(), envelope);
+ }
+
+ private String resolveTraceId(String traceId) {
+ if (traceId != null && !traceId.isBlank()) {
+ return traceId;
+ }
+ return "req_" + UUID.randomUUID();
+ }
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java
new file mode 100644
index 0000000..cdac8cd
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java
@@ -0,0 +1,9 @@
+package com.cloudmedia.content.events;
+
+public class NoopContentEventPublisher implements ContentEventPublisher {
+
+ @Override
+ public void publishContentPublished(ContentPublishedPayload payload, String traceId) {
+ // intentionally no-op when content event publishing is disabled
+ }
+}
diff --git a/services/java/content-service/src/main/resources/application.yml b/services/java/content-service/src/main/resources/application.yml
index fe60db8..05d4fc9 100644
--- a/services/java/content-service/src/main/resources/application.yml
+++ b/services/java/content-service/src/main/resources/application.yml
@@ -1,6 +1,11 @@
spring:
application:
name: content-service
+ kafka:
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
jpa:
open-in-view: false
@@ -10,3 +15,10 @@ server:
policy:
service:
base-url: http://localhost:8084
+
+cloudmedia:
+ content:
+ events:
+ enabled: false
+ topics:
+ content-published: cloudmedia.content.published
diff --git a/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java b/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java
index 972120a..78d8a55 100644
--- a/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java
+++ b/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java
@@ -2,6 +2,8 @@
import com.cloudmedia.content.api.content.dto.CreateContentRequest;
import com.cloudmedia.content.api.content.dto.UpdateContentRequest;
+import com.cloudmedia.content.events.ContentEventPublisher;
+import com.cloudmedia.content.events.ContentPublishedPayload;
import com.cloudmedia.content.error.ApiException;
import com.cloudmedia.content.persistence.entity.ChannelEntity;
import com.cloudmedia.content.persistence.entity.ChannelMemberEntity;
@@ -31,6 +33,10 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SpringBootTest
@@ -52,6 +58,9 @@ class ContentServiceIntegrationTest {
@MockBean
private PolicyEvaluationClient policyEvaluationClient;
+ @MockBean
+ private ContentEventPublisher contentEventPublisher;
+
@Test
void createDraftAppliesDefaultStateAndFlags() {
ChannelEntity channel = saveChannel("channel-content-1", "content-channel-one");
@@ -143,6 +152,10 @@ void publishTransitionsDraftToPublishedWhenReady() {
assertEquals(ContentState.PUBLISHED, published.state());
assertNotNull(published.publishedAt());
+ verify(contentEventPublisher).publishContentPublished(
+ eq(new ContentPublishedPayload(content.getId(), channel.getId(), "Ready draft", "desc",
+ ContentType.VIDEO.name(), ContentVisibility.PRIVATE.name(), published.publishedAt())),
+ isNull());
}
@Test
@@ -156,6 +169,7 @@ void publishRejectsWhenPlaybackNotReady() {
() -> contentService.publish(content.getId(), "publisher-2"));
assertEquals("CONTENT_NOT_READY", exception.getCode());
+ verify(contentEventPublisher, never()).publishContentPublished(any(), any());
}
@Test
@@ -169,6 +183,7 @@ void publishRejectsWhenStateIsNotDraft() {
() -> contentService.publish(content.getId(), "publisher-3"));
assertEquals("CONTENT_STATE_INVALID", exception.getCode());
+ verify(contentEventPublisher, never()).publishContentPublished(any(), any());
}
@Test
diff --git a/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java b/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java
new file mode 100644
index 0000000..2c6c265
--- /dev/null
+++ b/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java
@@ -0,0 +1,60 @@
+package com.cloudmedia.content.events;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class KafkaContentEventPublisherTest {
+
+ @Test
+ void publishContentPublishedSendsEnvelopeToConfiguredTopic() {
+ MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(),
+ new JsonSerializer<>());
+ ProducerFactory producerFactory = new ProducerFactory<>() {
+ @Override
+ public org.apache.kafka.clients.producer.Producer createProducer() {
+ return mockProducer;
+ }
+
+ @Override
+ public boolean transactionCapable() {
+ return false;
+ }
+
+ @Override
+ public Map getConfigurationProperties() {
+ return new HashMap<>();
+ }
+ };
+ KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
+ ContentEventsProperties properties = new ContentEventsProperties();
+ properties.getTopics().setContentPublished("cloudmedia.content.published");
+ KafkaContentEventPublisher publisher = new KafkaContentEventPublisher(kafkaTemplate, properties);
+
+ publisher.publishContentPublished(new ContentPublishedPayload("cnt_1", "chn_1", "Title", "Description", "VIDEO",
+ "PUBLIC", LocalDateTime.parse("2026-03-14T12:00:00")), "req_123");
+
+ ProducerRecord record = mockProducer.history().getFirst();
+ assertEquals("cloudmedia.content.published", record.topic());
+ assertEquals("cnt_1", record.key());
+ ContentEventEnvelope envelope = (ContentEventEnvelope) record.value();
+ assertEquals("content.published", envelope.eventType());
+ assertEquals(1, envelope.eventVersion());
+ assertEquals("content-service", envelope.producer());
+ assertEquals("content", envelope.entityType());
+ assertEquals("cnt_1", envelope.entityId());
+ assertEquals("req_123", envelope.traceId());
+ assertNotNull(envelope.eventId());
+ assertNotNull(envelope.occurredAt());
+ }
+}