From e4d8bf2a14220690dbf94f196f8b83a1bfce547e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?=
<83272398+PoyrazK@users.noreply.github.com>
Date: Mon, 6 Apr 2026 23:32:22 +0300
Subject: [PATCH 1/4] feat(content): add Kafka-backed content event publisher
foundation
---
services/java/content-service/pom.xml | 4 +++
.../content/events/ContentEventEnvelope.java | 7 ++++
.../content/events/ContentEventPublisher.java | 6 ++++
.../events/ContentEventsConfiguration.java | 26 ++++++++++++++
.../events/ContentEventsProperties.java | 36 +++++++++++++++++++
.../events/ContentPublishedPayload.java | 7 ++++
.../events/KafkaContentEventPublisher.java | 34 ++++++++++++++++++
.../events/NoopContentEventPublisher.java | 9 +++++
.../src/main/resources/application.yml | 12 +++++++
9 files changed, 141 insertions(+)
create mode 100644 services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java
create mode 100644 services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java
create mode 100644 services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java
create mode 100644 services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java
create mode 100644 services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java
create mode 100644 services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java
create mode 100644 services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java
diff --git a/services/java/content-service/pom.xml b/services/java/content-service/pom.xml
index ca8beb1..87206ce 100644
--- a/services/java/content-service/pom.xml
+++ b/services/java/content-service/pom.xml
@@ -40,6 +40,10 @@
postgresql
runtime
+
+ org.springframework.kafka
+ spring-kafka
+
org.springframework.boot
spring-boot-starter-test
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java
new file mode 100644
index 0000000..b36d3bb
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventEnvelope.java
@@ -0,0 +1,7 @@
+package com.cloudmedia.content.events;
+
+import java.time.Instant;
+
+public record ContentEventEnvelope(String eventId, String eventType, int eventVersion, Instant occurredAt,
+ String producer, String entityType, String entityId, String traceId, Object payload) {
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java
new file mode 100644
index 0000000..a0adfd3
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventPublisher.java
@@ -0,0 +1,6 @@
+package com.cloudmedia.content.events;
+
+public interface ContentEventPublisher {
+
+ void publishContentPublished(ContentPublishedPayload payload, String traceId);
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java
new file mode 100644
index 0000000..3a25933
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsConfiguration.java
@@ -0,0 +1,26 @@
+package com.cloudmedia.content.events;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
+
+@Configuration
+@EnableConfigurationProperties(ContentEventsProperties.class)
+public class ContentEventsConfiguration {
+
+ @Bean
+ @ConditionalOnProperty(prefix = "cloudmedia.content.events", name = "enabled", havingValue = "true")
+ KafkaContentEventPublisher kafkaContentEventPublisher(KafkaTemplate kafkaTemplate,
+ ContentEventsProperties properties) {
+ return new KafkaContentEventPublisher(kafkaTemplate, properties);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(ContentEventPublisher.class)
+ NoopContentEventPublisher noopContentEventPublisher() {
+ return new NoopContentEventPublisher();
+ }
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java
new file mode 100644
index 0000000..663d57d
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentEventsProperties.java
@@ -0,0 +1,36 @@
+package com.cloudmedia.content.events;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "cloudmedia.content.events")
+public class ContentEventsProperties {
+
+ private boolean enabled;
+
+ private final Topics topics = new Topics();
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public Topics getTopics() {
+ return topics;
+ }
+
+ public static class Topics {
+
+ private String contentPublished = "cloudmedia.content.published";
+
+ public String getContentPublished() {
+ return contentPublished;
+ }
+
+ public void setContentPublished(String contentPublished) {
+ this.contentPublished = contentPublished;
+ }
+ }
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java
new file mode 100644
index 0000000..080aea4
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/ContentPublishedPayload.java
@@ -0,0 +1,7 @@
+package com.cloudmedia.content.events;
+
+import java.time.LocalDateTime;
+
+public record ContentPublishedPayload(String contentId, String channelId, String title, String description,
+ String contentType, String visibility, LocalDateTime publishedAt) {
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java
new file mode 100644
index 0000000..56b44c4
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/KafkaContentEventPublisher.java
@@ -0,0 +1,34 @@
+package com.cloudmedia.content.events;
+
+import java.time.Instant;
+import java.util.UUID;
+import org.springframework.kafka.core.KafkaTemplate;
+
+public class KafkaContentEventPublisher implements ContentEventPublisher {
+
+ private static final int EVENT_VERSION = 1;
+
+ private final KafkaTemplate kafkaTemplate;
+
+ private final ContentEventsProperties properties;
+
+ public KafkaContentEventPublisher(KafkaTemplate kafkaTemplate, ContentEventsProperties properties) {
+ this.kafkaTemplate = kafkaTemplate;
+ this.properties = properties;
+ }
+
+ @Override
+ public void publishContentPublished(ContentPublishedPayload payload, String traceId) {
+ ContentEventEnvelope envelope = new ContentEventEnvelope(UUID.randomUUID().toString(), "content.published",
+ EVENT_VERSION, Instant.now(), "content-service", "content", payload.contentId(),
+ resolveTraceId(traceId), payload);
+ kafkaTemplate.send(properties.getTopics().getContentPublished(), payload.contentId(), envelope);
+ }
+
+ private String resolveTraceId(String traceId) {
+ if (traceId != null && !traceId.isBlank()) {
+ return traceId;
+ }
+ return "req_" + UUID.randomUUID();
+ }
+}
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java b/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java
new file mode 100644
index 0000000..cdac8cd
--- /dev/null
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/events/NoopContentEventPublisher.java
@@ -0,0 +1,9 @@
+package com.cloudmedia.content.events;
+
+public class NoopContentEventPublisher implements ContentEventPublisher {
+
+ @Override
+ public void publishContentPublished(ContentPublishedPayload payload, String traceId) {
+ // intentionally no-op when content event publishing is disabled
+ }
+}
diff --git a/services/java/content-service/src/main/resources/application.yml b/services/java/content-service/src/main/resources/application.yml
index fe60db8..05d4fc9 100644
--- a/services/java/content-service/src/main/resources/application.yml
+++ b/services/java/content-service/src/main/resources/application.yml
@@ -1,6 +1,11 @@
spring:
application:
name: content-service
+ kafka:
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
jpa:
open-in-view: false
@@ -10,3 +15,10 @@ server:
policy:
service:
base-url: http://localhost:8084
+
+cloudmedia:
+ content:
+ events:
+ enabled: false
+ topics:
+ content-published: cloudmedia.content.published
From 91d45c3c2ec6c5263dba2f1640cb67133174cf09 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?=
<83272398+PoyrazK@users.noreply.github.com>
Date: Mon, 6 Apr 2026 23:32:28 +0300
Subject: [PATCH 2/4] feat(content): emit content.published after successful
publish
---
.../application/content/ContentService.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git a/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java b/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java
index b7191dd..2dc07a4 100644
--- a/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java
+++ b/services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java
@@ -4,6 +4,8 @@
import com.cloudmedia.content.api.content.dto.CreateContentRequest;
import com.cloudmedia.content.api.content.dto.PlaybackResponse;
import com.cloudmedia.content.api.content.dto.UpdateContentRequest;
+import com.cloudmedia.content.events.ContentEventPublisher;
+import com.cloudmedia.content.events.ContentPublishedPayload;
import com.cloudmedia.content.error.ApiException;
import com.cloudmedia.content.persistence.entity.ContentState;
import com.cloudmedia.content.persistence.entity.ChannelEntity;
@@ -29,13 +31,16 @@ public class ContentService {
private final ChannelRepository channelRepository;
private final ChannelMemberRepository channelMemberRepository;
private final PolicyEvaluationClient policyEvaluationClient;
+ private final ContentEventPublisher contentEventPublisher;
public ContentService(ContentRepository contentRepository, ChannelRepository channelRepository,
- ChannelMemberRepository channelMemberRepository, PolicyEvaluationClient policyEvaluationClient) {
+ ChannelMemberRepository channelMemberRepository, PolicyEvaluationClient policyEvaluationClient,
+ ContentEventPublisher contentEventPublisher) {
this.contentRepository = contentRepository;
this.channelRepository = channelRepository;
this.channelMemberRepository = channelMemberRepository;
this.policyEvaluationClient = policyEvaluationClient;
+ this.contentEventPublisher = contentEventPublisher;
}
@Transactional
@@ -99,8 +104,14 @@ public ContentResponse publish(String contentId, String userId) {
content.setState(ContentState.PUBLISHED);
content.setPublishedAt(now);
content.setUpdatedAt(now);
-
- return toResponse(contentRepository.save(content));
+ ContentEntity savedContent = contentRepository.save(content);
+ contentEventPublisher.publishContentPublished(
+ new ContentPublishedPayload(savedContent.getId(), savedContent.getChannel().getId(),
+ savedContent.getTitle(), savedContent.getDescription(), savedContent.getContentType().name(),
+ savedContent.getVisibility().name(), savedContent.getPublishedAt()),
+ null);
+
+ return toResponse(savedContent);
}
@Transactional
From 0c82c96e03152a1f2e1a2d1416b97311a4659cb1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?=
<83272398+PoyrazK@users.noreply.github.com>
Date: Mon, 6 Apr 2026 23:32:36 +0300
Subject: [PATCH 3/4] test(content): verify published event emission and
envelope
---
.../ContentServiceIntegrationTest.java | 15 +++++
.../KafkaContentEventPublisherTest.java | 60 +++++++++++++++++++
2 files changed, 75 insertions(+)
create mode 100644 services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java
diff --git a/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java b/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java
index 972120a..78d8a55 100644
--- a/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java
+++ b/services/java/content-service/src/test/java/com/cloudmedia/content/application/content/ContentServiceIntegrationTest.java
@@ -2,6 +2,8 @@
import com.cloudmedia.content.api.content.dto.CreateContentRequest;
import com.cloudmedia.content.api.content.dto.UpdateContentRequest;
+import com.cloudmedia.content.events.ContentEventPublisher;
+import com.cloudmedia.content.events.ContentPublishedPayload;
import com.cloudmedia.content.error.ApiException;
import com.cloudmedia.content.persistence.entity.ChannelEntity;
import com.cloudmedia.content.persistence.entity.ChannelMemberEntity;
@@ -31,6 +33,10 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SpringBootTest
@@ -52,6 +58,9 @@ class ContentServiceIntegrationTest {
@MockBean
private PolicyEvaluationClient policyEvaluationClient;
+ @MockBean
+ private ContentEventPublisher contentEventPublisher;
+
@Test
void createDraftAppliesDefaultStateAndFlags() {
ChannelEntity channel = saveChannel("channel-content-1", "content-channel-one");
@@ -143,6 +152,10 @@ void publishTransitionsDraftToPublishedWhenReady() {
assertEquals(ContentState.PUBLISHED, published.state());
assertNotNull(published.publishedAt());
+ verify(contentEventPublisher).publishContentPublished(
+ eq(new ContentPublishedPayload(content.getId(), channel.getId(), "Ready draft", "desc",
+ ContentType.VIDEO.name(), ContentVisibility.PRIVATE.name(), published.publishedAt())),
+ isNull());
}
@Test
@@ -156,6 +169,7 @@ void publishRejectsWhenPlaybackNotReady() {
() -> contentService.publish(content.getId(), "publisher-2"));
assertEquals("CONTENT_NOT_READY", exception.getCode());
+ verify(contentEventPublisher, never()).publishContentPublished(any(), any());
}
@Test
@@ -169,6 +183,7 @@ void publishRejectsWhenStateIsNotDraft() {
() -> contentService.publish(content.getId(), "publisher-3"));
assertEquals("CONTENT_STATE_INVALID", exception.getCode());
+ verify(contentEventPublisher, never()).publishContentPublished(any(), any());
}
@Test
diff --git a/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java b/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java
new file mode 100644
index 0000000..2c6c265
--- /dev/null
+++ b/services/java/content-service/src/test/java/com/cloudmedia/content/events/KafkaContentEventPublisherTest.java
@@ -0,0 +1,60 @@
+package com.cloudmedia.content.events;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class KafkaContentEventPublisherTest {
+
+ @Test
+ void publishContentPublishedSendsEnvelopeToConfiguredTopic() {
+ MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(),
+ new JsonSerializer<>());
+ ProducerFactory producerFactory = new ProducerFactory<>() {
+ @Override
+ public org.apache.kafka.clients.producer.Producer createProducer() {
+ return mockProducer;
+ }
+
+ @Override
+ public boolean transactionCapable() {
+ return false;
+ }
+
+ @Override
+ public Map getConfigurationProperties() {
+ return new HashMap<>();
+ }
+ };
+ KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
+ ContentEventsProperties properties = new ContentEventsProperties();
+ properties.getTopics().setContentPublished("cloudmedia.content.published");
+ KafkaContentEventPublisher publisher = new KafkaContentEventPublisher(kafkaTemplate, properties);
+
+ publisher.publishContentPublished(new ContentPublishedPayload("cnt_1", "chn_1", "Title", "Description", "VIDEO",
+ "PUBLIC", LocalDateTime.parse("2026-03-14T12:00:00")), "req_123");
+
+ ProducerRecord record = mockProducer.history().getFirst();
+ assertEquals("cloudmedia.content.published", record.topic());
+ assertEquals("cnt_1", record.key());
+ ContentEventEnvelope envelope = (ContentEventEnvelope) record.value();
+ assertEquals("content.published", envelope.eventType());
+ assertEquals(1, envelope.eventVersion());
+ assertEquals("content-service", envelope.producer());
+ assertEquals("content", envelope.entityType());
+ assertEquals("cnt_1", envelope.entityId());
+ assertEquals("req_123", envelope.traceId());
+ assertNotNull(envelope.eventId());
+ assertNotNull(envelope.occurredAt());
+ }
+}
From 8e3ba0ce03fac42a6783dbc4a5cc2a6c2c0dadd1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?=
<83272398+PoyrazK@users.noreply.github.com>
Date: Mon, 6 Apr 2026 23:32:42 +0300
Subject: [PATCH 4/4] docs(content): note publish event emission in API
contract
---
docs/contracts/rest-api-v1.md | 1 +
1 file changed, 1 insertion(+)
diff --git a/docs/contracts/rest-api-v1.md b/docs/contracts/rest-api-v1.md
index 9586f3b..066539f 100644
--- a/docs/contracts/rest-api-v1.md
+++ b/docs/contracts/rest-api-v1.md
@@ -88,6 +88,7 @@ This document defines the MVP API contract groups, standards, and core request/r
- Publishes content from `DRAFT` to `PUBLISHED`
- Request fields (MVP): `userId`
- Requires `playbackReady=true`
+- Emits `content.published` event on success
- Returns `409 CONTENT_NOT_READY` when playback is not ready
- Returns `409 CONTENT_STATE_INVALID` when state is not `DRAFT`