Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions docs/adr/0014-policy-event-publishing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# ADR 014: Policy Service Kafka Event Publishing

## Status

Accepted

## Date

2026-04-26

## Context

The policy-service MVP was missing Kafka event publishing for policy changes. The implementation plan specified adding:

1. Kafka dependency and configuration
2. Event model classes (PolicyChangedPayload, PolicyEventEnvelope)
3. PolicyEventPublisher interface with Kafka and Noop implementations
4. Integration into ContentPolicyService

These events are needed for:

- Propagating policy changes to downstream consumers (content-service, discovery-service)
- Keeping derived systems (search index, recommendations) in sync with policy updates
- Audit trail and observability of policy changes

## Decision

### Kafka Dependency

Added `spring-kafka` dependency to `policy-service/pom.xml`.

### Event Model

Created `PolicyChangedPayload` record with fields:

- `contentId` - the content this policy applies to
- `ageRestricted` - age restriction flag
- `geoAllowList` / `geoBlockList` - geographic policy lists
- `moderationState` - current moderation state
- `occurredAt` - timestamp of the change

Created `PolicyEventEnvelope` matching the standard envelope contract with:

- `eventId`, `eventType`, `eventVersion`, `occurredAt`
- `producer` ("policy-service"), `entityType` ("content"), `entityId`
- `traceId` - request trace or auto-generated fallback

### Publisher Interface

Created `PolicyEventPublisher` interface with `publishPolicyChanged(payload, traceId)` method.

Two implementations:

- `KafkaPolicyEventPublisher` - sends to `cloudmedia.policy.changed` topic using `KafkaTemplate`
- `NoopPolicyEventPublisher` - no-op fallback when Kafka is disabled

### Configuration

Created `PolicyEventsProperties` with configurable topic (default: `cloudmedia.policy.changed`) and `PolicyEventsConfiguration` with conditional bean wiring:

- `KafkaPolicyEventPublisher` when `cloudmedia.policy.events.enabled=true`
- `NoopPolicyEventPublisher` as fallback

### Service Integration

Updated `ContentPolicyService` to inject `PolicyEventPublisher` and call `publishPolicyChanged` after both `updateContentPolicy` and `updateModerationState`.

## Consequences

### Positive

- Policy changes are now broadcast to downstream services via Kafka
- Standard envelope format maintains consistency across all services
- No-op fallback allows policy-service to run without Kafka during development
- Configuration-based enablement allows per-environment control

### Negative

- Adds Kafka as a runtime dependency for full functionality
- Event publishing is fire-and-forget (no retries in MVP)

### Neutral

- Event payload aligned with existing `policy.changed` schema in kafka-event-catalog.md
- Backward compatible additions to existing policy-service endpoints

## Alternatives Considered

### Alternative 1: Polling Instead of Events

Have consumers poll policy-service for changes.

**Why rejected:** Polling is inefficient and adds latency. Event-driven architecture per ADR-004 is the established pattern.

### Alternative 2: Synchronous HTTP Callbacks

Call downstream services directly via HTTP when policies change.

**Why rejected:** Tight coupling, no durability if downstream is down, no fan-out capability. Kafka provides decoupled, durable, fan-out delivery.

## Implementation Notes

- New files: 7 (4 implementation + 3 test)
- New tests: `PolicyEventPublisherTest` (integration), `KafkaPolicyEventPublisherTest` (unit), `NoopPolicyEventPublisherTest` (unit)
- All tests passing
8 changes: 4 additions & 4 deletions docs/contracts/kafka-event-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ All events must use this envelope:
{
"content_id": "cnt_123",
"age_restricted": true,
"geo_allow": ["TR"],
"geo_block": [],
"moderation_state": "visible",
"updated_by": "mod_123"
"geo_allow_list": ["TR", "DE"],
"geo_block_list": ["RU"],
"moderation_state": "HIDDEN",
"occurred_at": "2026-03-09T12:00:00Z"
}
```

Expand Down
10 changes: 5 additions & 5 deletions docs/modular-implementation-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ This roadmap breaks implementation into small, reviewable slices with one primar
- Phase D (done): publish/unpublish workflow with playback-ready guard + content listing + thumbnail URL support.

### PR-005: policy-service MVP
- Phase A (next): service foundation, persistence model, and error/API baseline.
- Phase B: age restriction and geo policy CRUD.
- Phase C: moderation state endpoint.
- Moderation visibility state endpoint.
- Phase A (done): service foundation, persistence model, and error/API baseline.
- Phase B (done): age restriction and geo policy CRUD.
- Phase C (done): moderation state endpoint.
- Phase D (done): Kafka event publishing for policy changes.

### PR-006: social-service MVP
- Comments create/edit/report.
Expand Down Expand Up @@ -88,4 +88,4 @@ This roadmap breaks implementation into small, reviewable slices with one primar
- PR-002: completed
- PR-003: completed
- PR-004: completed (Phases A, B, C, and D all complete - content listing + thumbnail support added)
- PR-005: next (policy-service MVP)
- PR-005: completed (Phases A, B, C, and D all complete - Kafka event publishing added)
4 changes: 4 additions & 0 deletions services/java/policy-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import com.cloudmedia.policy.api.policy.dto.UpdateContentPolicyRequest;
import com.cloudmedia.policy.api.policy.dto.UpdateModerationStateRequest;
import com.cloudmedia.policy.error.ApiException;
import com.cloudmedia.policy.events.PolicyChangedPayload;
import com.cloudmedia.policy.events.PolicyEventPublisher;
import com.cloudmedia.policy.persistence.entity.ContentPolicyEntity;
import com.cloudmedia.policy.persistence.entity.ModerationState;
import com.cloudmedia.policy.persistence.repository.ContentPolicyRepository;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -25,8 +28,12 @@ public class ContentPolicyService {

private final ContentPolicyRepository contentPolicyRepository;

public ContentPolicyService(ContentPolicyRepository contentPolicyRepository) {
private final PolicyEventPublisher policyEventPublisher;

public ContentPolicyService(ContentPolicyRepository contentPolicyRepository,
PolicyEventPublisher policyEventPublisher) {
this.contentPolicyRepository = contentPolicyRepository;
this.policyEventPublisher = policyEventPublisher;
}

@Transactional
Expand Down Expand Up @@ -56,14 +63,25 @@ public ContentPolicyResponse updateContentPolicy(String contentId, UpdateContent
entity.setGeoBlockList(serializeCodesWithGuard(normalizedBlockList));
}

return toResponse(contentPolicyRepository.save(entity));
ContentPolicyEntity saved = contentPolicyRepository.save(entity);
publishPolicyChanged(saved);
return toResponse(saved);
}

@Transactional
public ContentPolicyResponse updateModerationState(String contentId, UpdateModerationStateRequest request) {
ContentPolicyEntity entity = contentPolicyRepository.findById(contentId).orElseGet(() -> newEntity(contentId));
entity.setModerationState(request.moderationState());
return toResponse(contentPolicyRepository.save(entity));
ContentPolicyEntity saved = contentPolicyRepository.save(entity);
publishPolicyChanged(saved);
return toResponse(saved);
}

private void publishPolicyChanged(ContentPolicyEntity entity) {
PolicyChangedPayload payload = new PolicyChangedPayload(entity.getContentId(), entity.isAgeRestricted(),
parseCodes(entity.getGeoAllowList()), parseCodes(entity.getGeoBlockList()),
entity.getModerationState().name(), Instant.now());
policyEventPublisher.publishPolicyChanged(payload, null);
}

@Transactional(readOnly = true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.cloudmedia.policy.events;

import java.time.Instant;
import java.util.UUID;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaPolicyEventPublisher implements PolicyEventPublisher {

private static final int EVENT_VERSION = 1;

private final KafkaTemplate<String, Object> kafkaTemplate;

private final PolicyEventsProperties properties;

public KafkaPolicyEventPublisher(KafkaTemplate<String, Object> kafkaTemplate, PolicyEventsProperties properties) {
this.kafkaTemplate = kafkaTemplate;
this.properties = properties;
}

@Override
public void publishPolicyChanged(PolicyChangedPayload payload, String traceId) {
PolicyEventEnvelope envelope = new PolicyEventEnvelope(UUID.randomUUID().toString(), "policy.changed",
EVENT_VERSION, Instant.now(), "policy-service", "content", payload.contentId(), resolveTraceId(traceId),
payload);
kafkaTemplate.send(properties.getTopics().getPolicyChanged(), payload.contentId(), envelope);
}

private String resolveTraceId(String traceId) {
if (traceId != null && !traceId.isBlank()) {
return traceId;
}
return "req_" + UUID.randomUUID();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.cloudmedia.policy.events;

public class NoopPolicyEventPublisher implements PolicyEventPublisher {

@Override
public void publishPolicyChanged(PolicyChangedPayload payload, String traceId) {
// No-op implementation for when Kafka is disabled
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.cloudmedia.policy.events;

import java.time.Instant;
import java.util.List;

public record PolicyChangedPayload(String contentId, boolean ageRestricted, List<String> geoAllowList,
List<String> geoBlockList, String moderationState, Instant occurredAt) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.cloudmedia.policy.events;

import java.time.Instant;

public record PolicyEventEnvelope(String eventId, String eventType, int eventVersion, Instant occurredAt,
String producer, String entityType, String entityId, String traceId, Object payload) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.cloudmedia.policy.events;

public interface PolicyEventPublisher {

void publishPolicyChanged(PolicyChangedPayload payload, String traceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.cloudmedia.policy.events;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
@EnableConfigurationProperties(PolicyEventsProperties.class)
public class PolicyEventsConfiguration {

@Bean
@ConditionalOnProperty(prefix = "cloudmedia.policy.events", name = "enabled", havingValue = "true")
KafkaPolicyEventPublisher kafkaPolicyEventPublisher(KafkaTemplate<String, Object> kafkaTemplate,
PolicyEventsProperties properties) {
return new KafkaPolicyEventPublisher(kafkaTemplate, properties);
}

@Bean
@ConditionalOnMissingBean(PolicyEventPublisher.class)
NoopPolicyEventPublisher noopPolicyEventPublisher() {
return new NoopPolicyEventPublisher();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.cloudmedia.policy.events;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "cloudmedia.policy.events")
public class PolicyEventsProperties {

private boolean enabled;

private final Topics topics = new Topics();

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Topics getTopics() {
return topics;
}

public static class Topics {

private String policyChanged = "cloudmedia.policy.changed";

public String getPolicyChanged() {
return policyChanged;
}

public void setPolicyChanged(String policyChanged) {
this.policyChanged = policyChanged;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.cloudmedia.policy.application;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

import com.cloudmedia.policy.api.policy.dto.UpdateContentPolicyRequest;
import com.cloudmedia.policy.api.policy.dto.UpdateModerationStateRequest;
import com.cloudmedia.policy.events.PolicyChangedPayload;
import com.cloudmedia.policy.events.PolicyEventPublisher;
import com.cloudmedia.policy.persistence.entity.ModerationState;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.transaction.annotation.Transactional;

@SpringBootTest
@Transactional
class PolicyEventPublisherTest {

@Autowired
private ContentPolicyService contentPolicyService;

@MockBean
private PolicyEventPublisher policyEventPublisher;

@Test
void updateContentPolicyPublishesPolicyChangedEvent() {
contentPolicyService.updateContentPolicy("content-event-1",
new UpdateContentPolicyRequest(true, List.of("US", "TR"), List.of("RU")));

verify(policyEventPublisher).publishPolicyChanged(any(PolicyChangedPayload.class), any());
}

@Test
void updateModerationStatePublishesPolicyChangedEvent() {
contentPolicyService.updateModerationState("content-event-2",
new UpdateModerationStateRequest(ModerationState.HIDDEN));

verify(policyEventPublisher).publishPolicyChanged(any(PolicyChangedPayload.class), any());
}
}
Loading
Loading