Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/contracts/rest-api-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ This document defines the MVP API contract groups, standards, and core request/r
### `PATCH /v1/content/{content_id}`
- Partially updates content metadata for channel members
- Mutable fields (MVP): `title`, `description`, `visibility`
- Emits `content.updated` event when content is in `PUBLISHED` state

### `POST /v1/content/{content_id}/publish`
- Publishes content from `DRAFT` to `PUBLISHED`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.cloudmedia.content.api.content.dto.UpdateContentRequest;
import com.cloudmedia.content.events.ContentEventPublisher;
import com.cloudmedia.content.events.ContentPublishedPayload;
import com.cloudmedia.content.events.ContentUpdatedPayload;
import com.cloudmedia.content.error.ApiException;
import com.cloudmedia.content.persistence.entity.ContentState;
import com.cloudmedia.content.persistence.entity.ChannelEntity;
Expand Down Expand Up @@ -83,7 +84,14 @@ public ContentResponse updateMetadata(String contentId, UpdateContentRequest req
}
content.setUpdatedAt(LocalDateTime.now());

return toResponse(contentRepository.save(content));
ContentEntity savedContent = contentRepository.save(content);
if (savedContent.getState() == ContentState.PUBLISHED) {
contentEventPublisher.publishContentUpdated(new ContentUpdatedPayload(savedContent.getId(),
savedContent.getChannel().getId(), savedContent.getTitle(), savedContent.getContentType().name(),
savedContent.getVisibility().name()), null);
}
Comment on lines +89 to +92
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Publish content.updated only after transaction commit.

On Lines 89-92, the Kafka-facing publish path is executed inside the transaction. If commit fails afterward, consumers can observe an update event for data that never committed.

Suggested direction (after-commit dispatch)
-		if (savedContent.getState() == ContentState.PUBLISHED) {
-			contentEventPublisher.publishContentUpdated(new ContentUpdatedPayload(savedContent.getId(),
-					savedContent.getChannel().getId(), savedContent.getTitle(), savedContent.getContentType().name(),
-					savedContent.getVisibility().name()), null);
-		}
+		if (savedContent.getState() == ContentState.PUBLISHED) {
+			ContentUpdatedPayload payload = new ContentUpdatedPayload(
+					savedContent.getId(),
+					savedContent.getChannel().getId(),
+					savedContent.getTitle(),
+					savedContent.getContentType().name(),
+					savedContent.getVisibility().name());
+			// Prefer after-commit publish (or outbox pattern) to avoid phantom events on rollback.
+			org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization(
+					new org.springframework.transaction.support.TransactionSynchronization() {
+						`@Override`
+						public void afterCommit() {
+							contentEventPublisher.publishContentUpdated(payload, null);
+						}
+					});
+		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
contentEventPublisher.publishContentUpdated(new ContentUpdatedPayload(savedContent.getId(),
savedContent.getChannel().getId(), savedContent.getTitle(), savedContent.getContentType().name(),
savedContent.getVisibility().name()), null);
}
if (savedContent.getState() == ContentState.PUBLISHED) {
ContentUpdatedPayload payload = new ContentUpdatedPayload(
savedContent.getId(),
savedContent.getChannel().getId(),
savedContent.getTitle(),
savedContent.getContentType().name(),
savedContent.getVisibility().name());
// Prefer after-commit publish (or outbox pattern) to avoid phantom events on rollback.
org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization(
new org.springframework.transaction.support.TransactionSynchronization() {
`@Override`
public void afterCommit() {
contentEventPublisher.publishContentUpdated(payload, null);
}
});
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@services/java/content-service/src/main/java/com/cloudmedia/content/application/content/ContentService.java`
around lines 89 - 92, The publish call to
contentEventPublisher.publishContentUpdated using new
ContentUpdatedPayload(savedContent.getId(), savedContent.getChannel().getId(),
savedContent.getTitle(), savedContent.getContentType().name(),
savedContent.getVisibility().name()) is currently invoked inside the transaction
and must be deferred until after commit; change the code that saves content in
ContentService to register an after-commit callback (e.g.,
TransactionSynchronizationManager.registerSynchronization or a
TransactionSynchronization.afterCommit lambda, or emit an application event
handled by `@TransactionalEventListener`(phase = AFTER_COMMIT)) and move the
contentEventPublisher.publishContentUpdated invocation into that afterCommit
callback so the Kafka event is only published if the transaction successfully
commits.


return toResponse(savedContent);
}

@Transactional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
public interface ContentEventPublisher {

void publishContentPublished(ContentPublishedPayload payload, String traceId);

void publishContentUpdated(ContentUpdatedPayload payload, String traceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@ public static class Topics {

private String contentPublished = "cloudmedia.content.published";

private String contentUpdated = "cloudmedia.content.updated";

public String getContentPublished() {
return contentPublished;
}

public void setContentPublished(String contentPublished) {
this.contentPublished = contentPublished;
}

public String getContentUpdated() {
return contentUpdated;
}

public void setContentUpdated(String contentUpdated) {
this.contentUpdated = contentUpdated;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.cloudmedia.content.events;

public record ContentUpdatedPayload(String contentId, String channelId, String title, String contentType,
String visibility) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ public void publishContentPublished(ContentPublishedPayload payload, String trac
kafkaTemplate.send(properties.getTopics().getContentPublished(), payload.contentId(), envelope);
}

@Override
public void publishContentUpdated(ContentUpdatedPayload payload, String traceId) {
ContentEventEnvelope envelope = new ContentEventEnvelope(UUID.randomUUID().toString(), "content.updated",
EVENT_VERSION, Instant.now(), "content-service", "content", payload.contentId(),
resolveTraceId(traceId), payload);
kafkaTemplate.send(properties.getTopics().getContentUpdated(), payload.contentId(), envelope);
}

private String resolveTraceId(String traceId) {
if (traceId != null && !traceId.isBlank()) {
return traceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ public class NoopContentEventPublisher implements ContentEventPublisher {
public void publishContentPublished(ContentPublishedPayload payload, String traceId) {
// intentionally no-op when content event publishing is disabled
}

@Override
public void publishContentUpdated(ContentUpdatedPayload payload, String traceId) {
// intentionally no-op when content event publishing is disabled
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.cloudmedia.content.api.content.dto.UpdateContentRequest;
import com.cloudmedia.content.events.ContentEventPublisher;
import com.cloudmedia.content.events.ContentPublishedPayload;
import com.cloudmedia.content.events.ContentUpdatedPayload;
import com.cloudmedia.content.error.ApiException;
import com.cloudmedia.content.persistence.entity.ChannelEntity;
import com.cloudmedia.content.persistence.entity.ChannelMemberEntity;
Expand Down Expand Up @@ -213,6 +214,21 @@ void unpublishRejectsWhenStateIsNotPublished() {
assertEquals("CONTENT_STATE_INVALID", exception.getCode());
}

@Test
void updateMetadataEmitsContentUpdatedWhenPublished() {
ChannelEntity channel = saveChannel("channel-content-11", "content-channel-eleven");
saveMembership(channel, "updater-1", ChannelMemberRole.ADMIN);
ContentEntity content = saveContent(channel, "Original", "desc", ContentVisibility.PUBLIC,
ContentState.PUBLISHED, true);

contentService.updateMetadata(content.getId(),
new UpdateContentRequest("updater-1", "Updated Title", null, ContentVisibility.UNLISTED));

verify(contentEventPublisher).publishContentUpdated(eq(new ContentUpdatedPayload(content.getId(),
channel.getId(), "Updated Title", ContentType.VIDEO.name(), ContentVisibility.UNLISTED.name())),
isNull());
}

private ChannelEntity saveChannel(String id, String slug) {
ChannelEntity channel = new ChannelEntity();
channel.setId(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,53 @@ public Map<String, Object> getConfigurationProperties() {
assertNotNull(envelope.eventId());
assertNotNull(envelope.occurredAt());
}

@Test
void publishContentUpdatedSendsEnvelopeToConfiguredTopic() {
MockProducer<String, Object> mockProducer = new MockProducer<>(true, new StringSerializer(),
new JsonSerializer<>());
ProducerFactory<String, Object> producerFactory = new ProducerFactory<>() {
@Override
public org.apache.kafka.clients.producer.Producer<String, Object> createProducer() {
return mockProducer;
}

@Override
public boolean transactionCapable() {
return false;
}

@Override
public Map<String, Object> getConfigurationProperties() {
return new HashMap<>();
}
};
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
ContentEventsProperties properties = new ContentEventsProperties();
properties.getTopics().setContentUpdated("cloudmedia.content.updated");
KafkaContentEventPublisher publisher = new KafkaContentEventPublisher(kafkaTemplate, properties);

publisher.publishContentUpdated(
new ContentUpdatedPayload("cnt_2", "chn_2", "Updated Title", "VIDEO", "UNLISTED"), "req_456");

assertEquals(1, mockProducer.history().size());
ProducerRecord<String, Object> record = mockProducer.history().getFirst();
assertEquals("cloudmedia.content.updated", record.topic());
assertEquals("cnt_2", record.key());
ContentEventEnvelope envelope = (ContentEventEnvelope) record.value();
assertEquals("content.updated", envelope.eventType());
assertEquals(1, envelope.eventVersion());
assertEquals("content-service", envelope.producer());
assertEquals("content", envelope.entityType());
assertEquals("cnt_2", envelope.entityId());
assertEquals("req_456", envelope.traceId());
assertNotNull(envelope.eventId());
assertNotNull(envelope.occurredAt());
ContentUpdatedPayload payload = (ContentUpdatedPayload) envelope.payload();
assertEquals("cnt_2", payload.contentId());
assertEquals("chn_2", payload.channelId());
assertEquals("Updated Title", payload.title());
assertEquals("VIDEO", payload.contentType());
assertEquals("UNLISTED", payload.visibility());
}
}
Loading