diff --git a/docs/adr/0014-policy-event-publishing.md b/docs/adr/0014-policy-event-publishing.md new file mode 100644 index 0000000..31beb1f --- /dev/null +++ b/docs/adr/0014-policy-event-publishing.md @@ -0,0 +1,105 @@ +# ADR 014: Policy Service Kafka Event Publishing + +## Status + +Accepted + +## Date + +2026-04-26 + +## Context + +The policy-service MVP was missing Kafka event publishing for policy changes. The implementation plan specified adding: + +1. Kafka dependency and configuration +2. Event model classes (PolicyChangedPayload, PolicyEventEnvelope) +3. PolicyEventPublisher interface with Kafka and Noop implementations +4. Integration into ContentPolicyService + +These events are needed for: + +- Propagating policy changes to downstream consumers (content-service, discovery-service) +- Keeping derived systems (search index, recommendations) in sync with policy updates +- Audit trail and observability of policy changes + +## Decision + +### Kafka Dependency + +Added `spring-kafka` dependency to `policy-service/pom.xml`. + +### Event Model + +Created `PolicyChangedPayload` record with fields: + +- `contentId` - the content this policy applies to +- `ageRestricted` - age restriction flag +- `geoAllowList` / `geoBlockList` - geographic policy lists +- `moderationState` - current moderation state +- `occurredAt` - timestamp of the change + +Created `PolicyEventEnvelope` matching the standard envelope contract with: + +- `eventId`, `eventType`, `eventVersion`, `occurredAt` +- `producer` ("policy-service"), `entityType` ("content"), `entityId` +- `traceId` - request trace or auto-generated fallback + +### Publisher Interface + +Created `PolicyEventPublisher` interface with `publishPolicyChanged(payload, traceId)` method. + +Two implementations: + +- `KafkaPolicyEventPublisher` - sends to `cloudmedia.policy.changed` topic using `KafkaTemplate` +- `NoopPolicyEventPublisher` - no-op fallback when Kafka is disabled + +### Configuration + +Created `PolicyEventsProperties` with configurable topic (default: `cloudmedia.policy.changed`) and `PolicyEventsConfiguration` with conditional bean wiring: + +- `KafkaPolicyEventPublisher` when `cloudmedia.policy.events.enabled=true` +- `NoopPolicyEventPublisher` as fallback + +### Service Integration + +Updated `ContentPolicyService` to inject `PolicyEventPublisher` and call `publishPolicyChanged` after both `updateContentPolicy` and `updateModerationState`. + +## Consequences + +### Positive + +- Policy changes are now broadcast to downstream services via Kafka +- Standard envelope format maintains consistency across all services +- No-op fallback allows policy-service to run without Kafka during development +- Configuration-based enablement allows per-environment control + +### Negative + +- Adds Kafka as a runtime dependency for full functionality +- Event publishing is fire-and-forget (no retries in MVP) + +### Neutral + +- Event payload aligned with existing `policy.changed` schema in kafka-event-catalog.md +- Backward compatible additions to existing policy-service endpoints + +## Alternatives Considered + +### Alternative 1: Polling Instead of Events + +Have consumers poll policy-service for changes. + +**Why rejected:** Polling is inefficient and adds latency. Event-driven architecture per ADR-004 is the established pattern. + +### Alternative 2: Synchronous HTTP Callbacks + +Call downstream services directly via HTTP when policies change. + +**Why rejected:** Tight coupling, no durability if downstream is down, no fan-out capability. Kafka provides decoupled, durable, fan-out delivery. + +## Implementation Notes + +- New files: 7 (4 implementation + 3 test) +- New tests: `PolicyEventPublisherTest` (integration), `KafkaPolicyEventPublisherTest` (unit), `NoopPolicyEventPublisherTest` (unit) +- All tests passing diff --git a/docs/contracts/kafka-event-catalog.md b/docs/contracts/kafka-event-catalog.md index ec7286b..7c582d2 100644 --- a/docs/contracts/kafka-event-catalog.md +++ b/docs/contracts/kafka-event-catalog.md @@ -107,10 +107,10 @@ All events must use this envelope: { "content_id": "cnt_123", "age_restricted": true, - "geo_allow": ["TR"], - "geo_block": [], - "moderation_state": "visible", - "updated_by": "mod_123" + "geo_allow_list": ["TR", "DE"], + "geo_block_list": ["RU"], + "moderation_state": "HIDDEN", + "occurred_at": "2026-03-09T12:00:00Z" } ``` diff --git a/docs/modular-implementation-roadmap.md b/docs/modular-implementation-roadmap.md index 5d9d096..0aefde6 100644 --- a/docs/modular-implementation-roadmap.md +++ b/docs/modular-implementation-roadmap.md @@ -40,10 +40,10 @@ This roadmap breaks implementation into small, reviewable slices with one primar - Phase D (done): publish/unpublish workflow with playback-ready guard + content listing + thumbnail URL support. ### PR-005: policy-service MVP -- Phase A (next): service foundation, persistence model, and error/API baseline. -- Phase B: age restriction and geo policy CRUD. -- Phase C: moderation state endpoint. -- Moderation visibility state endpoint. +- Phase A (done): service foundation, persistence model, and error/API baseline. +- Phase B (done): age restriction and geo policy CRUD. +- Phase C (done): moderation state endpoint. +- Phase D (done): Kafka event publishing for policy changes. ### PR-006: social-service MVP - Comments create/edit/report. @@ -88,4 +88,4 @@ This roadmap breaks implementation into small, reviewable slices with one primar - PR-002: completed - PR-003: completed - PR-004: completed (Phases A, B, C, and D all complete - content listing + thumbnail support added) -- PR-005: next (policy-service MVP) +- PR-005: completed (Phases A, B, C, and D all complete - Kafka event publishing added) diff --git a/services/java/policy-service/pom.xml b/services/java/policy-service/pom.xml index f03a472..c7f903d 100644 --- a/services/java/policy-service/pom.xml +++ b/services/java/policy-service/pom.xml @@ -44,6 +44,10 @@ postgresql runtime + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-test diff --git a/services/java/policy-service/src/main/java/com/cloudmedia/policy/application/ContentPolicyService.java b/services/java/policy-service/src/main/java/com/cloudmedia/policy/application/ContentPolicyService.java index 6ec26fa..20a4307 100644 --- a/services/java/policy-service/src/main/java/com/cloudmedia/policy/application/ContentPolicyService.java +++ b/services/java/policy-service/src/main/java/com/cloudmedia/policy/application/ContentPolicyService.java @@ -6,9 +6,12 @@ import com.cloudmedia.policy.api.policy.dto.UpdateContentPolicyRequest; import com.cloudmedia.policy.api.policy.dto.UpdateModerationStateRequest; import com.cloudmedia.policy.error.ApiException; +import com.cloudmedia.policy.events.PolicyChangedPayload; +import com.cloudmedia.policy.events.PolicyEventPublisher; import com.cloudmedia.policy.persistence.entity.ContentPolicyEntity; import com.cloudmedia.policy.persistence.entity.ModerationState; import com.cloudmedia.policy.persistence.repository.ContentPolicyRepository; +import java.time.Instant; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; @@ -25,8 +28,12 @@ public class ContentPolicyService { private final ContentPolicyRepository contentPolicyRepository; - public ContentPolicyService(ContentPolicyRepository contentPolicyRepository) { + private final PolicyEventPublisher policyEventPublisher; + + public ContentPolicyService(ContentPolicyRepository contentPolicyRepository, + PolicyEventPublisher policyEventPublisher) { this.contentPolicyRepository = contentPolicyRepository; + this.policyEventPublisher = policyEventPublisher; } @Transactional @@ -56,14 +63,25 @@ public ContentPolicyResponse updateContentPolicy(String contentId, UpdateContent entity.setGeoBlockList(serializeCodesWithGuard(normalizedBlockList)); } - return toResponse(contentPolicyRepository.save(entity)); + ContentPolicyEntity saved = contentPolicyRepository.save(entity); + publishPolicyChanged(saved); + return toResponse(saved); } @Transactional public ContentPolicyResponse updateModerationState(String contentId, UpdateModerationStateRequest request) { ContentPolicyEntity entity = contentPolicyRepository.findById(contentId).orElseGet(() -> newEntity(contentId)); entity.setModerationState(request.moderationState()); - return toResponse(contentPolicyRepository.save(entity)); + ContentPolicyEntity saved = contentPolicyRepository.save(entity); + publishPolicyChanged(saved); + return toResponse(saved); + } + + private void publishPolicyChanged(ContentPolicyEntity entity) { + PolicyChangedPayload payload = new PolicyChangedPayload(entity.getContentId(), entity.isAgeRestricted(), + parseCodes(entity.getGeoAllowList()), parseCodes(entity.getGeoBlockList()), + entity.getModerationState().name(), Instant.now()); + policyEventPublisher.publishPolicyChanged(payload, null); } @Transactional(readOnly = true) diff --git a/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/KafkaPolicyEventPublisher.java b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/KafkaPolicyEventPublisher.java new file mode 100644 index 0000000..1490c62 --- /dev/null +++ b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/KafkaPolicyEventPublisher.java @@ -0,0 +1,34 @@ +package com.cloudmedia.policy.events; + +import java.time.Instant; +import java.util.UUID; +import org.springframework.kafka.core.KafkaTemplate; + +public class KafkaPolicyEventPublisher implements PolicyEventPublisher { + + private static final int EVENT_VERSION = 1; + + private final KafkaTemplate kafkaTemplate; + + private final PolicyEventsProperties properties; + + public KafkaPolicyEventPublisher(KafkaTemplate kafkaTemplate, PolicyEventsProperties properties) { + this.kafkaTemplate = kafkaTemplate; + this.properties = properties; + } + + @Override + public void publishPolicyChanged(PolicyChangedPayload payload, String traceId) { + PolicyEventEnvelope envelope = new PolicyEventEnvelope(UUID.randomUUID().toString(), "policy.changed", + EVENT_VERSION, Instant.now(), "policy-service", "content", payload.contentId(), resolveTraceId(traceId), + payload); + kafkaTemplate.send(properties.getTopics().getPolicyChanged(), payload.contentId(), envelope); + } + + private String resolveTraceId(String traceId) { + if (traceId != null && !traceId.isBlank()) { + return traceId; + } + return "req_" + UUID.randomUUID(); + } +} diff --git a/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/NoopPolicyEventPublisher.java b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/NoopPolicyEventPublisher.java new file mode 100644 index 0000000..cd17d3c --- /dev/null +++ b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/NoopPolicyEventPublisher.java @@ -0,0 +1,9 @@ +package com.cloudmedia.policy.events; + +public class NoopPolicyEventPublisher implements PolicyEventPublisher { + + @Override + public void publishPolicyChanged(PolicyChangedPayload payload, String traceId) { + // No-op implementation for when Kafka is disabled + } +} diff --git a/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyChangedPayload.java b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyChangedPayload.java new file mode 100644 index 0000000..32bf9dc --- /dev/null +++ b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyChangedPayload.java @@ -0,0 +1,8 @@ +package com.cloudmedia.policy.events; + +import java.time.Instant; +import java.util.List; + +public record PolicyChangedPayload(String contentId, boolean ageRestricted, List geoAllowList, + List geoBlockList, String moderationState, Instant occurredAt) { +} diff --git a/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventEnvelope.java b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventEnvelope.java new file mode 100644 index 0000000..4fc3898 --- /dev/null +++ b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventEnvelope.java @@ -0,0 +1,7 @@ +package com.cloudmedia.policy.events; + +import java.time.Instant; + +public record PolicyEventEnvelope(String eventId, String eventType, int eventVersion, Instant occurredAt, + String producer, String entityType, String entityId, String traceId, Object payload) { +} diff --git a/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventPublisher.java b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventPublisher.java new file mode 100644 index 0000000..5b896ab --- /dev/null +++ b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventPublisher.java @@ -0,0 +1,6 @@ +package com.cloudmedia.policy.events; + +public interface PolicyEventPublisher { + + void publishPolicyChanged(PolicyChangedPayload payload, String traceId); +} diff --git a/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventsConfiguration.java b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventsConfiguration.java new file mode 100644 index 0000000..1a949ba --- /dev/null +++ b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventsConfiguration.java @@ -0,0 +1,26 @@ +package com.cloudmedia.policy.events; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; + +@Configuration +@EnableConfigurationProperties(PolicyEventsProperties.class) +public class PolicyEventsConfiguration { + + @Bean + @ConditionalOnProperty(prefix = "cloudmedia.policy.events", name = "enabled", havingValue = "true") + KafkaPolicyEventPublisher kafkaPolicyEventPublisher(KafkaTemplate kafkaTemplate, + PolicyEventsProperties properties) { + return new KafkaPolicyEventPublisher(kafkaTemplate, properties); + } + + @Bean + @ConditionalOnMissingBean(PolicyEventPublisher.class) + NoopPolicyEventPublisher noopPolicyEventPublisher() { + return new NoopPolicyEventPublisher(); + } +} diff --git a/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventsProperties.java b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventsProperties.java new file mode 100644 index 0000000..37a259e --- /dev/null +++ b/services/java/policy-service/src/main/java/com/cloudmedia/policy/events/PolicyEventsProperties.java @@ -0,0 +1,36 @@ +package com.cloudmedia.policy.events; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "cloudmedia.policy.events") +public class PolicyEventsProperties { + + private boolean enabled; + + private final Topics topics = new Topics(); + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public Topics getTopics() { + return topics; + } + + public static class Topics { + + private String policyChanged = "cloudmedia.policy.changed"; + + public String getPolicyChanged() { + return policyChanged; + } + + public void setPolicyChanged(String policyChanged) { + this.policyChanged = policyChanged; + } + } +} diff --git a/services/java/policy-service/src/test/java/com/cloudmedia/policy/application/PolicyEventPublisherTest.java b/services/java/policy-service/src/test/java/com/cloudmedia/policy/application/PolicyEventPublisherTest.java new file mode 100644 index 0000000..825441b --- /dev/null +++ b/services/java/policy-service/src/test/java/com/cloudmedia/policy/application/PolicyEventPublisherTest.java @@ -0,0 +1,43 @@ +package com.cloudmedia.policy.application; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + +import com.cloudmedia.policy.api.policy.dto.UpdateContentPolicyRequest; +import com.cloudmedia.policy.api.policy.dto.UpdateModerationStateRequest; +import com.cloudmedia.policy.events.PolicyChangedPayload; +import com.cloudmedia.policy.events.PolicyEventPublisher; +import com.cloudmedia.policy.persistence.entity.ModerationState; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.transaction.annotation.Transactional; + +@SpringBootTest +@Transactional +class PolicyEventPublisherTest { + + @Autowired + private ContentPolicyService contentPolicyService; + + @MockBean + private PolicyEventPublisher policyEventPublisher; + + @Test + void updateContentPolicyPublishesPolicyChangedEvent() { + contentPolicyService.updateContentPolicy("content-event-1", + new UpdateContentPolicyRequest(true, List.of("US", "TR"), List.of("RU"))); + + verify(policyEventPublisher).publishPolicyChanged(any(PolicyChangedPayload.class), any()); + } + + @Test + void updateModerationStatePublishesPolicyChangedEvent() { + contentPolicyService.updateModerationState("content-event-2", + new UpdateModerationStateRequest(ModerationState.HIDDEN)); + + verify(policyEventPublisher).publishPolicyChanged(any(PolicyChangedPayload.class), any()); + } +} diff --git a/services/java/policy-service/src/test/java/com/cloudmedia/policy/events/KafkaPolicyEventPublisherTest.java b/services/java/policy-service/src/test/java/com/cloudmedia/policy/events/KafkaPolicyEventPublisherTest.java new file mode 100644 index 0000000..bfff9a6 --- /dev/null +++ b/services/java/policy-service/src/test/java/com/cloudmedia/policy/events/KafkaPolicyEventPublisherTest.java @@ -0,0 +1,100 @@ +package com.cloudmedia.policy.events; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class KafkaPolicyEventPublisherTest { + + @Test + void publishPolicyChangedSendsEnvelopeToConfiguredTopic() { + MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), + new JsonSerializer<>()); + ProducerFactory producerFactory = new ProducerFactory<>() { + @Override + public org.apache.kafka.clients.producer.Producer createProducer() { + return mockProducer; + } + + @Override + public boolean transactionCapable() { + return false; + } + + @Override + public Map getConfigurationProperties() { + return new HashMap<>(); + } + }; + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory); + PolicyEventsProperties properties = new PolicyEventsProperties(); + properties.getTopics().setPolicyChanged("cloudmedia.policy.changed"); + KafkaPolicyEventPublisher publisher = new KafkaPolicyEventPublisher(kafkaTemplate, properties); + + publisher.publishPolicyChanged(new PolicyChangedPayload("cnt_1", true, java.util.List.of("TR", "DE"), + java.util.List.of("RU"), "HIDDEN", Instant.parse("2026-03-14T12:00:00Z")), "req_789"); + + ProducerRecord record = mockProducer.history().getFirst(); + assertEquals("cloudmedia.policy.changed", record.topic()); + assertEquals("cnt_1", record.key()); + PolicyEventEnvelope envelope = (PolicyEventEnvelope) record.value(); + assertEquals("policy.changed", envelope.eventType()); + assertEquals(1, envelope.eventVersion()); + assertEquals("policy-service", envelope.producer()); + assertEquals("content", envelope.entityType()); + assertEquals("cnt_1", envelope.entityId()); + assertEquals("req_789", envelope.traceId()); + assertNotNull(envelope.eventId()); + assertNotNull(envelope.occurredAt()); + PolicyChangedPayload payload = (PolicyChangedPayload) envelope.payload(); + assertEquals("cnt_1", payload.contentId()); + assertEquals(true, payload.ageRestricted()); + assertEquals(java.util.List.of("TR", "DE"), payload.geoAllowList()); + assertEquals(java.util.List.of("RU"), payload.geoBlockList()); + assertEquals("HIDDEN", payload.moderationState()); + } + + @Test + void publishPolicyChangedGeneratesTraceIdWhenNull() { + MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), + new JsonSerializer<>()); + ProducerFactory producerFactory = new ProducerFactory<>() { + @Override + public org.apache.kafka.clients.producer.Producer createProducer() { + return mockProducer; + } + + @Override + public boolean transactionCapable() { + return false; + } + + @Override + public Map getConfigurationProperties() { + return new HashMap<>(); + } + }; + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory); + PolicyEventsProperties properties = new PolicyEventsProperties(); + KafkaPolicyEventPublisher publisher = new KafkaPolicyEventPublisher(kafkaTemplate, properties); + + publisher.publishPolicyChanged(new PolicyChangedPayload("cnt_2", false, java.util.List.of(), + java.util.List.of(), "VISIBLE", Instant.now()), null); + + ProducerRecord record = mockProducer.history().getFirst(); + PolicyEventEnvelope envelope = (PolicyEventEnvelope) record.value(); + assertNotNull(envelope.traceId()); + assertTrue(envelope.traceId().startsWith("req_")); + } +} diff --git a/services/java/policy-service/src/test/java/com/cloudmedia/policy/events/NoopPolicyEventPublisherTest.java b/services/java/policy-service/src/test/java/com/cloudmedia/policy/events/NoopPolicyEventPublisherTest.java new file mode 100644 index 0000000..8124099 --- /dev/null +++ b/services/java/policy-service/src/test/java/com/cloudmedia/policy/events/NoopPolicyEventPublisherTest.java @@ -0,0 +1,19 @@ +package com.cloudmedia.policy.events; + +import java.time.Instant; +import java.util.List; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +class NoopPolicyEventPublisherTest { + + @Test + void publishPolicyChangedDoesNothing() { + NoopPolicyEventPublisher publisher = new NoopPolicyEventPublisher(); + PolicyChangedPayload payload = new PolicyChangedPayload("cnt_1", true, List.of("TR"), List.of("RU"), "HIDDEN", + Instant.now()); + + assertDoesNotThrow(() -> publisher.publishPolicyChanged(payload, "req_123")); + } +}