diff --git a/docs/contracts/rest-api-v1.md b/docs/contracts/rest-api-v1.md index dafbd6c..797421c 100644 --- a/docs/contracts/rest-api-v1.md +++ b/docs/contracts/rest-api-v1.md @@ -131,8 +131,10 @@ This document defines the MVP API contract groups, standards, and core request/r ### `GET /v1/discovery/home` - Balanced feed (followed + trending + fresh + similar) -- MVP params: optional `userId`; optional integer `size` with min `1`, default `20`, max `50` +- MVP params: optional `userId`; optional integer `size` with min `1`, default `20`, max `50`; optional `countryCode` (ISO 3166-1 alpha-2 uppercase, e.g. `US`); optional `ageVerified` - Returns a generic blended feed when `userId` is absent +- Filters out policy-blocked items from the blended feed +- Returns `503 POLICY_SERVICE_UNAVAILABLE` when policy evaluation fails ### `GET /v1/discovery/trending` - Region-level trending feed @@ -186,3 +188,4 @@ This document defines the MVP API contract groups, standards, and core request/r - `409`: conflict/idempotency collision - `429`: rate limit exceeded - `500`: internal error +- `503`: dependency unavailable (e.g. `POLICY_SERVICE_UNAVAILABLE` on `GET /v1/discovery/home`) diff --git a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/discovery/DiscoveryController.java b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/discovery/DiscoveryController.java index 1deb61d..778e95e 100644 --- a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/discovery/DiscoveryController.java +++ b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/discovery/DiscoveryController.java @@ -2,6 +2,7 @@ import com.cloudmedia.discovery.api.response.ApiMeta; import com.cloudmedia.discovery.api.response.ApiSuccessResponse; +import com.cloudmedia.discovery.api.validation.ValidCountryCode; import com.cloudmedia.discovery.discovery.HomeFeedResponse; import com.cloudmedia.discovery.discovery.HomeFeedService; import jakarta.validation.constraints.Max; @@ -31,9 +32,11 @@ public DiscoveryController(HomeFeedService homeFeedService) { public ResponseEntity> home( @RequestParam(value = "userId", required = false) String userId, @RequestParam(value = "size", required = false) @Min(1) @Max(50) Integer size, + @RequestParam(value = "countryCode", required = false) @ValidCountryCode String countryCode, + @RequestParam(value = "ageVerified", required = false) Boolean ageVerified, @RequestHeader(value = "X-Request-Id", required = false) String requestId) { String effectiveRequestId = requestId(requestId); - HomeFeedResponse response = homeFeedService.homeFeed(userId, size); + HomeFeedResponse response = homeFeedService.homeFeed(userId, size, countryCode, ageVerified); return ResponseEntity.ok(new ApiSuccessResponse<>(response, new ApiMeta(effectiveRequestId, Instant.now()))); } diff --git a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/validation/CountryCodeValidator.java b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/validation/CountryCodeValidator.java new file mode 100644 index 0000000..4782efc --- /dev/null +++ b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/validation/CountryCodeValidator.java @@ -0,0 +1,25 @@ +package com.cloudmedia.discovery.api.validation; + +import jakarta.validation.ConstraintValidator; +import jakarta.validation.ConstraintValidatorContext; +import java.util.Arrays; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.regex.Pattern; + +public class CountryCodeValidator implements ConstraintValidator { + + private static final Set ISO_COUNTRY_CODES = Arrays.stream(Locale.getISOCountries()) + .collect(Collectors.toUnmodifiableSet()); + + private static final Pattern COUNTRY_CODE_PATTERN = Pattern.compile("^[A-Z]{2}$"); + + @Override + public boolean isValid(String value, ConstraintValidatorContext context) { + if (value == null) { + return true; + } + return COUNTRY_CODE_PATTERN.matcher(value).matches() && ISO_COUNTRY_CODES.contains(value); + } +} diff --git a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/validation/ValidCountryCode.java b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/validation/ValidCountryCode.java new file mode 100644 index 0000000..b3711df --- /dev/null +++ b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/api/validation/ValidCountryCode.java @@ -0,0 +1,22 @@ +package com.cloudmedia.discovery.api.validation; + +import jakarta.validation.Constraint; +import jakarta.validation.Payload; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Constraint(validatedBy = CountryCodeValidator.class) +@Target({ElementType.PARAMETER, ElementType.FIELD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface ValidCountryCode { + + String message() default "must be an ISO 3166-1 alpha-2 uppercase code"; + + Class[] groups() default {}; + + Class[] payload() default {}; +} diff --git a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/discovery/HomeFeedService.java b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/discovery/HomeFeedService.java index c2cf574..4eb5516 100644 --- a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/discovery/HomeFeedService.java +++ b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/discovery/HomeFeedService.java @@ -1,10 +1,16 @@ package com.cloudmedia.discovery.discovery; +import com.cloudmedia.discovery.error.ApiException; +import com.cloudmedia.discovery.policy.PolicyEvaluationClient; +import com.cloudmedia.discovery.policy.PolicyDecision; +import com.cloudmedia.discovery.policy.PolicyEvaluationException; import com.cloudmedia.discovery.search.SearchIndexReader; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; @Service @@ -18,14 +24,22 @@ public class HomeFeedService { private final SearchIndexReader searchIndexReader; - public HomeFeedService(SearchIndexReader searchIndexReader) { + private final PolicyEvaluationClient policyEvaluationClient; + + public HomeFeedService(SearchIndexReader searchIndexReader, PolicyEvaluationClient policyEvaluationClient) { this.searchIndexReader = searchIndexReader; + this.policyEvaluationClient = policyEvaluationClient; } - public HomeFeedResponse homeFeed(String userId, Integer size) { + public HomeFeedResponse homeFeed(String userId, Integer size, String countryCode, Boolean ageVerified) { int resolvedSize = size == null ? DEFAULT_SIZE : Math.min(Math.max(size, MIN_SIZE), MAX_SIZE); HomeFeedCandidates candidates = searchIndexReader.homeFeed(userId, resolvedSize); - return new HomeFeedResponse(blend(candidates, resolvedSize), resolvedSize); + HomeFeedCandidates policyFilteredCandidates = new HomeFeedCandidates( + filterByPolicy(candidates.followed(), countryCode, ageVerified), + filterByPolicy(candidates.trending(), countryCode, ageVerified), + filterByPolicy(candidates.fresh(), countryCode, ageVerified), + filterByPolicy(candidates.similar(), countryCode, ageVerified)); + return new HomeFeedResponse(blend(policyFilteredCandidates, resolvedSize), resolvedSize); } private List blend(HomeFeedCandidates candidates, int size) { @@ -74,4 +88,20 @@ private void fillRemaining(Map deduped, HomeFeedCandidates private int slotsFor(int size, double ratio) { return Math.max(1, (int) Math.floor(size * ratio)); } + + private List filterByPolicy(List items, String countryCode, Boolean ageVerified) { + try { + List distinctContentIds = items.stream().map(HomeFeedItem::contentId).filter(Objects::nonNull) + .distinct().toList(); + Map decisions = policyEvaluationClient.evaluateBatch(distinctContentIds, + countryCode, ageVerified); + return items.stream().filter(item -> { + PolicyDecision decision = decisions.get(item.contentId()); + return decision != null && decision.allowed(); + }).toList(); + } catch (PolicyEvaluationException exception) { + throw new ApiException(HttpStatus.SERVICE_UNAVAILABLE, "POLICY_SERVICE_UNAVAILABLE", + "Policy evaluation is temporarily unavailable", null); + } + } } diff --git a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/HttpPolicyEvaluationClient.java b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/HttpPolicyEvaluationClient.java index 0dd5b04..19e98e6 100644 --- a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/HttpPolicyEvaluationClient.java +++ b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/HttpPolicyEvaluationClient.java @@ -2,11 +2,20 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import jakarta.annotation.Nullable; +import jakarta.annotation.PreDestroy; import java.time.Duration; +import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; import org.springframework.http.client.SimpleClientHttpRequestFactory; +import org.springframework.stereotype.Component; import org.springframework.web.client.RestClient; import org.springframework.web.client.RestClientException; @@ -15,12 +24,16 @@ public class HttpPolicyEvaluationClient implements PolicyEvaluationClient { private final RestClient restClient; + private final ExecutorService batchExecutor; + public HttpPolicyEvaluationClient(RestClient.Builder restClientBuilder, @Value("${policy.service.base-url:http://localhost:8084}") String baseUrl) { SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory(); requestFactory.setConnectTimeout((int) Duration.ofSeconds(2).toMillis()); requestFactory.setReadTimeout((int) Duration.ofSeconds(3).toMillis()); this.restClient = restClientBuilder.baseUrl(baseUrl).requestFactory(requestFactory).build(); + this.batchExecutor = Executors + .newFixedThreadPool(Math.max(2, Math.min(8, Runtime.getRuntime().availableProcessors()))); } @Override @@ -38,6 +51,36 @@ public PolicyDecision evaluate(String contentId, String countryCode, Boolean age } } + @Override + public Map evaluateBatch(Collection contentIds, String countryCode, + Boolean ageVerified) { + List uniqueIds = contentIds.stream().filter(Objects::nonNull).distinct().toList(); + Map> futures = new LinkedHashMap<>(); + for (String contentId : uniqueIds) { + futures.put(contentId, + CompletableFuture.supplyAsync(() -> evaluate(contentId, countryCode, ageVerified), batchExecutor)); + } + + Map decisions = new LinkedHashMap<>(); + for (Map.Entry> entry : futures.entrySet()) { + try { + decisions.put(entry.getKey(), entry.getValue().join()); + } catch (CompletionException exception) { + Throwable cause = exception.getCause(); + if (cause instanceof PolicyEvaluationException policyEvaluationException) { + throw policyEvaluationException; + } + throw new PolicyEvaluationException("Policy service batch evaluation failed", cause); + } + } + return decisions; + } + + @PreDestroy + void shutdownExecutor() { + batchExecutor.shutdownNow(); + } + private record PolicyEvaluateRequest(@Nullable String countryCode, @Nullable Boolean ageVerified) { } diff --git a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/PolicyEvaluationClient.java b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/PolicyEvaluationClient.java index 4d1a5c2..720fee0 100644 --- a/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/PolicyEvaluationClient.java +++ b/services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/PolicyEvaluationClient.java @@ -1,6 +1,21 @@ package com.cloudmedia.discovery.policy; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; + public interface PolicyEvaluationClient { PolicyDecision evaluate(String contentId, String countryCode, Boolean ageVerified); + + default Map evaluateBatch(Collection contentIds, String countryCode, + Boolean ageVerified) { + Map decisions = new LinkedHashMap<>(); + for (String contentId : contentIds) { + if (contentId != null && !decisions.containsKey(contentId)) { + decisions.put(contentId, evaluate(contentId, countryCode, ageVerified)); + } + } + return decisions; + } } diff --git a/services/java/discovery-service/src/test/java/com/cloudmedia/discovery/api/discovery/DiscoveryControllerTest.java b/services/java/discovery-service/src/test/java/com/cloudmedia/discovery/api/discovery/DiscoveryControllerTest.java index 9d62870..69103a8 100644 --- a/services/java/discovery-service/src/test/java/com/cloudmedia/discovery/api/discovery/DiscoveryControllerTest.java +++ b/services/java/discovery-service/src/test/java/com/cloudmedia/discovery/api/discovery/DiscoveryControllerTest.java @@ -1,5 +1,6 @@ package com.cloudmedia.discovery.api.discovery; +import com.cloudmedia.discovery.error.ApiException; import com.cloudmedia.discovery.discovery.FeedSourceBucket; import com.cloudmedia.discovery.discovery.HomeFeedCandidates; import com.cloudmedia.discovery.discovery.HomeFeedItem; @@ -17,6 +18,7 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; +import org.springframework.http.HttpStatus; import org.springframework.test.web.servlet.MockMvc; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; @@ -45,6 +47,19 @@ void homeValidatesSizeLimit() throws Exception { .andExpect(jsonPath("$.error.code").value("VALIDATION_ERROR")); } + @Test + void homeValidatesCountryCodeFormat() throws Exception { + mockMvc.perform(get("/v1/discovery/home").param("countryCode", "usa")).andExpect(status().isBadRequest()) + .andExpect(jsonPath("$.error.code").value("VALIDATION_ERROR")); + } + + @Test + void homeReturnsServiceUnavailableWhenPolicyEvaluationFails() throws Exception { + mockMvc.perform(get("/v1/discovery/home").param("userId", "fail-policy")) + .andExpect(status().isServiceUnavailable()) + .andExpect(jsonPath("$.error.code").value("POLICY_SERVICE_UNAVAILABLE")); + } + @TestConfiguration static class HomeFeedTestConfiguration { @@ -66,9 +81,14 @@ public AutocompleteResponse autocomplete(String query, int size) { public HomeFeedCandidates homeFeed(String userId, int size) { return HomeFeedCandidates.empty(); } - }) { + }, (contentId, countryCode, ageVerified) -> new com.cloudmedia.discovery.policy.PolicyDecision(true, + List.of())) { @Override - public HomeFeedResponse homeFeed(String userId, Integer size) { + public HomeFeedResponse homeFeed(String userId, Integer size, String countryCode, Boolean ageVerified) { + if ("fail-policy".equals(userId)) { + throw new ApiException(HttpStatus.SERVICE_UNAVAILABLE, "POLICY_SERVICE_UNAVAILABLE", + "Policy evaluation is temporarily unavailable", null); + } return new HomeFeedResponse(List.of(new HomeFeedItem("cnt_1", "chn_1", "Title", "Description", "VIDEO", "PUBLIC", Instant.parse("2026-03-14T12:00:00Z"), FeedSourceBucket.TRENDING)), 2); } diff --git a/services/java/discovery-service/src/test/java/com/cloudmedia/discovery/discovery/HomeFeedServiceTest.java b/services/java/discovery-service/src/test/java/com/cloudmedia/discovery/discovery/HomeFeedServiceTest.java index 19b111a..99f35be 100644 --- a/services/java/discovery-service/src/test/java/com/cloudmedia/discovery/discovery/HomeFeedServiceTest.java +++ b/services/java/discovery-service/src/test/java/com/cloudmedia/discovery/discovery/HomeFeedServiceTest.java @@ -1,6 +1,8 @@ package com.cloudmedia.discovery.discovery; import com.cloudmedia.discovery.search.AutocompleteResponse; +import com.cloudmedia.discovery.policy.PolicyDecision; +import com.cloudmedia.discovery.policy.PolicyEvaluationClient; import com.cloudmedia.discovery.search.SearchIndexReader; import com.cloudmedia.discovery.search.SearchResponse; import java.time.Instant; @@ -14,11 +16,15 @@ class HomeFeedServiceTest { @Test void homeFeedUsesDefaultSizeAndBlendsDedupedItems() { RecordingSearchIndexReader reader = new RecordingSearchIndexReader(); - HomeFeedService service = new HomeFeedService(reader); + RecordingPolicyEvaluationClient policyClient = new RecordingPolicyEvaluationClient(); + HomeFeedService service = new HomeFeedService(reader, policyClient); - HomeFeedResponse response = service.homeFeed(null, null); + HomeFeedResponse response = service.homeFeed(null, null, "US", true); assertEquals(20, reader.size); + assertEquals(List.of("follow-1", "trend-1", "trend-1", "similar-1"), policyClient.recordedContentIds); + assertEquals(List.of("US", "US", "US", "US"), policyClient.recordedCountryCodes); + assertEquals(List.of(Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, Boolean.TRUE), policyClient.recordedAgeVerified); assertEquals(3, response.items().size()); assertEquals(FeedSourceBucket.FOLLOWED, response.items().get(0).sourceBucket()); assertEquals(FeedSourceBucket.TRENDING, response.items().get(1).sourceBucket()); @@ -27,9 +33,9 @@ void homeFeedUsesDefaultSizeAndBlendsDedupedItems() { @Test void homeFeedClampsSizeAndAvoidsDuplicateContentIds() { RecordingSearchIndexReader reader = new RecordingSearchIndexReader(); - HomeFeedService service = new HomeFeedService(reader); + HomeFeedService service = new HomeFeedService(reader, new RecordingPolicyEvaluationClient()); - HomeFeedResponse response = service.homeFeed("user-1", 2); + HomeFeedResponse response = service.homeFeed("user-1", 2, null, null); assertEquals("user-1", reader.userId); assertEquals(2, response.size()); @@ -37,6 +43,18 @@ void homeFeedClampsSizeAndAvoidsDuplicateContentIds() { assertEquals(List.of("follow-1", "trend-1"), response.items().stream().map(HomeFeedItem::contentId).toList()); } + @Test + void homeFeedFiltersPolicyBlockedItems() { + RecordingSearchIndexReader reader = new RecordingSearchIndexReader(); + RecordingPolicyEvaluationClient policyClient = new RecordingPolicyEvaluationClient(); + policyClient.blockedContentIds = List.of("trend-1"); + HomeFeedService service = new HomeFeedService(reader, policyClient); + + HomeFeedResponse response = service.homeFeed("user-1", 3, "DE", true); + + assertEquals(List.of("follow-1", "similar-1"), response.items().stream().map(HomeFeedItem::contentId).toList()); + } + static class RecordingSearchIndexReader implements SearchIndexReader { private String userId; @@ -70,4 +88,24 @@ private HomeFeedItem item(String contentId, FeedSourceBucket bucket) { Instant.parse("2026-03-14T12:00:00Z"), bucket); } } + + static class RecordingPolicyEvaluationClient implements PolicyEvaluationClient { + + private final List recordedContentIds = new java.util.ArrayList<>(); + + private final List recordedCountryCodes = new java.util.ArrayList<>(); + + private final List recordedAgeVerified = new java.util.ArrayList<>(); + + private List blockedContentIds = List.of(); + + @Override + public PolicyDecision evaluate(String contentId, String countryCode, Boolean ageVerified) { + recordedContentIds.add(contentId); + recordedCountryCodes.add(countryCode); + recordedAgeVerified.add(ageVerified); + boolean allowed = !blockedContentIds.contains(contentId); + return new PolicyDecision(allowed, allowed ? List.of() : List.of("CONTENT_BLOCKED")); + } + } }