Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/contracts/rest-api-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ This document defines the MVP API contract groups, standards, and core request/r
- Publishes content from `DRAFT` to `PUBLISHED`
- Request fields (MVP): `userId`
- Requires `playbackReady=true`
- Emits `content.published` event on success
- Returns `409 CONTENT_NOT_READY` when playback is not ready
- Returns `409 CONTENT_STATE_INVALID` when state is not `DRAFT`

Expand Down
4 changes: 4 additions & 0 deletions services/java/content-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.cloudmedia.content.api.content.dto.CreateContentRequest;
import com.cloudmedia.content.api.content.dto.PlaybackResponse;
import com.cloudmedia.content.api.content.dto.UpdateContentRequest;
import com.cloudmedia.content.events.ContentEventPublisher;
import com.cloudmedia.content.events.ContentPublishedPayload;
import com.cloudmedia.content.error.ApiException;
import com.cloudmedia.content.persistence.entity.ContentState;
import com.cloudmedia.content.persistence.entity.ChannelEntity;
Expand All @@ -29,13 +31,16 @@ public class ContentService {
private final ChannelRepository channelRepository;
private final ChannelMemberRepository channelMemberRepository;
private final PolicyEvaluationClient policyEvaluationClient;
private final ContentEventPublisher contentEventPublisher;

public ContentService(ContentRepository contentRepository, ChannelRepository channelRepository,
ChannelMemberRepository channelMemberRepository, PolicyEvaluationClient policyEvaluationClient) {
ChannelMemberRepository channelMemberRepository, PolicyEvaluationClient policyEvaluationClient,
ContentEventPublisher contentEventPublisher) {
this.contentRepository = contentRepository;
this.channelRepository = channelRepository;
this.channelMemberRepository = channelMemberRepository;
this.policyEvaluationClient = policyEvaluationClient;
this.contentEventPublisher = contentEventPublisher;
}

@Transactional
Expand Down Expand Up @@ -99,8 +104,14 @@ public ContentResponse publish(String contentId, String userId) {
content.setState(ContentState.PUBLISHED);
content.setPublishedAt(now);
content.setUpdatedAt(now);

return toResponse(contentRepository.save(content));
ContentEntity savedContent = contentRepository.save(content);
contentEventPublisher.publishContentPublished(
new ContentPublishedPayload(savedContent.getId(), savedContent.getChannel().getId(),
savedContent.getTitle(), savedContent.getDescription(), savedContent.getContentType().name(),
savedContent.getVisibility().name(), savedContent.getPublishedAt()),
null);

return toResponse(savedContent);
}

@Transactional
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.cloudmedia.content.events;

import java.time.Instant;

public record ContentEventEnvelope(String eventId, String eventType, int eventVersion, Instant occurredAt,
String producer, String entityType, String entityId, String traceId, Object payload) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.cloudmedia.content.events;

public interface ContentEventPublisher {

void publishContentPublished(ContentPublishedPayload payload, String traceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.cloudmedia.content.events;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
@EnableConfigurationProperties(ContentEventsProperties.class)
public class ContentEventsConfiguration {

@Bean
@ConditionalOnProperty(prefix = "cloudmedia.content.events", name = "enabled", havingValue = "true")
KafkaContentEventPublisher kafkaContentEventPublisher(KafkaTemplate<String, Object> kafkaTemplate,
ContentEventsProperties properties) {
return new KafkaContentEventPublisher(kafkaTemplate, properties);
}

@Bean
@ConditionalOnMissingBean(ContentEventPublisher.class)
NoopContentEventPublisher noopContentEventPublisher() {
return new NoopContentEventPublisher();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.cloudmedia.content.events;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "cloudmedia.content.events")
public class ContentEventsProperties {

private boolean enabled;

private final Topics topics = new Topics();

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Topics getTopics() {
return topics;
}

public static class Topics {

private String contentPublished = "cloudmedia.content.published";

public String getContentPublished() {
return contentPublished;
}

public void setContentPublished(String contentPublished) {
this.contentPublished = contentPublished;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.cloudmedia.content.events;

import java.time.LocalDateTime;

public record ContentPublishedPayload(String contentId, String channelId, String title, String description,
String contentType, String visibility, LocalDateTime publishedAt) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.cloudmedia.content.events;

import java.time.Instant;
import java.util.UUID;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaContentEventPublisher implements ContentEventPublisher {

private static final int EVENT_VERSION = 1;

private final KafkaTemplate<String, Object> kafkaTemplate;

private final ContentEventsProperties properties;

public KafkaContentEventPublisher(KafkaTemplate<String, Object> kafkaTemplate, ContentEventsProperties properties) {
this.kafkaTemplate = kafkaTemplate;
this.properties = properties;
}

@Override
public void publishContentPublished(ContentPublishedPayload payload, String traceId) {
ContentEventEnvelope envelope = new ContentEventEnvelope(UUID.randomUUID().toString(), "content.published",
EVENT_VERSION, Instant.now(), "content-service", "content", payload.contentId(),
resolveTraceId(traceId), payload);
kafkaTemplate.send(properties.getTopics().getContentPublished(), payload.contentId(), envelope);
}

private String resolveTraceId(String traceId) {
if (traceId != null && !traceId.isBlank()) {
return traceId;
}
return "req_" + UUID.randomUUID();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.cloudmedia.content.events;

public class NoopContentEventPublisher implements ContentEventPublisher {

@Override
public void publishContentPublished(ContentPublishedPayload payload, String traceId) {
// intentionally no-op when content event publishing is disabled
}
}
12 changes: 12 additions & 0 deletions services/java/content-service/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
spring:
application:
name: content-service
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
jpa:
open-in-view: false

Expand All @@ -10,3 +15,10 @@ server:
policy:
service:
base-url: http://localhost:8084

cloudmedia:
content:
events:
enabled: false
topics:
content-published: cloudmedia.content.published
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.cloudmedia.content.api.content.dto.CreateContentRequest;
import com.cloudmedia.content.api.content.dto.UpdateContentRequest;
import com.cloudmedia.content.events.ContentEventPublisher;
import com.cloudmedia.content.events.ContentPublishedPayload;
import com.cloudmedia.content.error.ApiException;
import com.cloudmedia.content.persistence.entity.ChannelEntity;
import com.cloudmedia.content.persistence.entity.ChannelMemberEntity;
Expand Down Expand Up @@ -31,6 +33,10 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SpringBootTest
Expand All @@ -52,6 +58,9 @@ class ContentServiceIntegrationTest {
@MockBean
private PolicyEvaluationClient policyEvaluationClient;

@MockBean
private ContentEventPublisher contentEventPublisher;

@Test
void createDraftAppliesDefaultStateAndFlags() {
ChannelEntity channel = saveChannel("channel-content-1", "content-channel-one");
Expand Down Expand Up @@ -143,6 +152,10 @@ void publishTransitionsDraftToPublishedWhenReady() {

assertEquals(ContentState.PUBLISHED, published.state());
assertNotNull(published.publishedAt());
verify(contentEventPublisher).publishContentPublished(
eq(new ContentPublishedPayload(content.getId(), channel.getId(), "Ready draft", "desc",
ContentType.VIDEO.name(), ContentVisibility.PRIVATE.name(), published.publishedAt())),
isNull());
}

@Test
Expand All @@ -156,6 +169,7 @@ void publishRejectsWhenPlaybackNotReady() {
() -> contentService.publish(content.getId(), "publisher-2"));

assertEquals("CONTENT_NOT_READY", exception.getCode());
verify(contentEventPublisher, never()).publishContentPublished(any(), any());
}

@Test
Expand All @@ -169,6 +183,7 @@ void publishRejectsWhenStateIsNotDraft() {
() -> contentService.publish(content.getId(), "publisher-3"));

assertEquals("CONTENT_STATE_INVALID", exception.getCode());
verify(contentEventPublisher, never()).publishContentPublished(any(), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.cloudmedia.content.events;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

class KafkaContentEventPublisherTest {

@Test
void publishContentPublishedSendsEnvelopeToConfiguredTopic() {
MockProducer<String, Object> mockProducer = new MockProducer<>(true, new StringSerializer(),
new JsonSerializer<>());
ProducerFactory<String, Object> producerFactory = new ProducerFactory<>() {
@Override
public org.apache.kafka.clients.producer.Producer<String, Object> createProducer() {
return mockProducer;
}

@Override
public boolean transactionCapable() {
return false;
}

@Override
public Map<String, Object> getConfigurationProperties() {
return new HashMap<>();
}
};
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
ContentEventsProperties properties = new ContentEventsProperties();
properties.getTopics().setContentPublished("cloudmedia.content.published");
KafkaContentEventPublisher publisher = new KafkaContentEventPublisher(kafkaTemplate, properties);

publisher.publishContentPublished(new ContentPublishedPayload("cnt_1", "chn_1", "Title", "Description", "VIDEO",
"PUBLIC", LocalDateTime.parse("2026-03-14T12:00:00")), "req_123");

ProducerRecord<String, Object> record = mockProducer.history().getFirst();
assertEquals("cloudmedia.content.published", record.topic());
assertEquals("cnt_1", record.key());
ContentEventEnvelope envelope = (ContentEventEnvelope) record.value();
assertEquals("content.published", envelope.eventType());
assertEquals(1, envelope.eventVersion());
assertEquals("content-service", envelope.producer());
assertEquals("content", envelope.entityType());
assertEquals("cnt_1", envelope.entityId());
assertEquals("req_123", envelope.traceId());
assertNotNull(envelope.eventId());
assertNotNull(envelope.occurredAt());
}
}
Loading