Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/contracts/rest-api-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ This document defines the MVP API contract groups, standards, and core request/r

### `GET /v1/discovery/home`
- Balanced feed (followed + trending + fresh + similar)
- MVP params: optional `userId`; optional integer `size` with min `1`, default `20`, max `50`
- MVP params: optional `userId`; optional integer `size` with min `1`, default `20`, max `50`; optional `countryCode` (ISO 3166-1 alpha-2 uppercase, e.g. `US`); optional `ageVerified`
- Returns a generic blended feed when `userId` is absent
- Filters out policy-blocked items from the blended feed
- Returns `503 POLICY_SERVICE_UNAVAILABLE` when policy evaluation fails

Comment thread
coderabbitai[bot] marked this conversation as resolved.
### `GET /v1/discovery/trending`
- Region-level trending feed
Expand Down Expand Up @@ -186,3 +188,4 @@ This document defines the MVP API contract groups, standards, and core request/r
- `409`: conflict/idempotency collision
- `429`: rate limit exceeded
- `500`: internal error
- `503`: dependency unavailable (e.g. `POLICY_SERVICE_UNAVAILABLE` on `GET /v1/discovery/home`)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.cloudmedia.discovery.api.response.ApiMeta;
import com.cloudmedia.discovery.api.response.ApiSuccessResponse;
import com.cloudmedia.discovery.api.validation.ValidCountryCode;
import com.cloudmedia.discovery.discovery.HomeFeedResponse;
import com.cloudmedia.discovery.discovery.HomeFeedService;
import jakarta.validation.constraints.Max;
Expand Down Expand Up @@ -31,9 +32,11 @@ public DiscoveryController(HomeFeedService homeFeedService) {
public ResponseEntity<ApiSuccessResponse<HomeFeedResponse>> home(
@RequestParam(value = "userId", required = false) String userId,
@RequestParam(value = "size", required = false) @Min(1) @Max(50) Integer size,
@RequestParam(value = "countryCode", required = false) @ValidCountryCode String countryCode,
@RequestParam(value = "ageVerified", required = false) Boolean ageVerified,
@RequestHeader(value = "X-Request-Id", required = false) String requestId) {
String effectiveRequestId = requestId(requestId);
HomeFeedResponse response = homeFeedService.homeFeed(userId, size);
HomeFeedResponse response = homeFeedService.homeFeed(userId, size, countryCode, ageVerified);
return ResponseEntity.ok(new ApiSuccessResponse<>(response, new ApiMeta(effectiveRequestId, Instant.now())));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.cloudmedia.discovery.api.validation;

import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;
import java.util.Arrays;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.regex.Pattern;

public class CountryCodeValidator implements ConstraintValidator<ValidCountryCode, String> {

private static final Set<String> ISO_COUNTRY_CODES = Arrays.stream(Locale.getISOCountries())
.collect(Collectors.toUnmodifiableSet());

private static final Pattern COUNTRY_CODE_PATTERN = Pattern.compile("^[A-Z]{2}$");

@Override
public boolean isValid(String value, ConstraintValidatorContext context) {
if (value == null) {
return true;
}
return COUNTRY_CODE_PATTERN.matcher(value).matches() && ISO_COUNTRY_CODES.contains(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.cloudmedia.discovery.api.validation;

import jakarta.validation.Constraint;
import jakarta.validation.Payload;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Documented
@Constraint(validatedBy = CountryCodeValidator.class)
@Target({ElementType.PARAMETER, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ValidCountryCode {

String message() default "must be an ISO 3166-1 alpha-2 uppercase code";

Class<?>[] groups() default {};

Class<? extends Payload>[] payload() default {};
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package com.cloudmedia.discovery.discovery;

import com.cloudmedia.discovery.error.ApiException;
import com.cloudmedia.discovery.policy.PolicyEvaluationClient;
import com.cloudmedia.discovery.policy.PolicyDecision;
import com.cloudmedia.discovery.policy.PolicyEvaluationException;
import com.cloudmedia.discovery.search.SearchIndexReader;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;

@Service
Expand All @@ -18,14 +24,22 @@ public class HomeFeedService {

private final SearchIndexReader searchIndexReader;

public HomeFeedService(SearchIndexReader searchIndexReader) {
private final PolicyEvaluationClient policyEvaluationClient;

public HomeFeedService(SearchIndexReader searchIndexReader, PolicyEvaluationClient policyEvaluationClient) {
this.searchIndexReader = searchIndexReader;
this.policyEvaluationClient = policyEvaluationClient;
}

public HomeFeedResponse homeFeed(String userId, Integer size) {
public HomeFeedResponse homeFeed(String userId, Integer size, String countryCode, Boolean ageVerified) {
int resolvedSize = size == null ? DEFAULT_SIZE : Math.min(Math.max(size, MIN_SIZE), MAX_SIZE);
HomeFeedCandidates candidates = searchIndexReader.homeFeed(userId, resolvedSize);
return new HomeFeedResponse(blend(candidates, resolvedSize), resolvedSize);
HomeFeedCandidates policyFilteredCandidates = new HomeFeedCandidates(
filterByPolicy(candidates.followed(), countryCode, ageVerified),
filterByPolicy(candidates.trending(), countryCode, ageVerified),
filterByPolicy(candidates.fresh(), countryCode, ageVerified),
filterByPolicy(candidates.similar(), countryCode, ageVerified));
return new HomeFeedResponse(blend(policyFilteredCandidates, resolvedSize), resolvedSize);
}

private List<HomeFeedItem> blend(HomeFeedCandidates candidates, int size) {
Expand Down Expand Up @@ -74,4 +88,20 @@ private void fillRemaining(Map<String, HomeFeedItem> deduped, HomeFeedCandidates
private int slotsFor(int size, double ratio) {
return Math.max(1, (int) Math.floor(size * ratio));
}

private List<HomeFeedItem> filterByPolicy(List<HomeFeedItem> items, String countryCode, Boolean ageVerified) {
try {
List<String> distinctContentIds = items.stream().map(HomeFeedItem::contentId).filter(Objects::nonNull)
.distinct().toList();
Map<String, PolicyDecision> decisions = policyEvaluationClient.evaluateBatch(distinctContentIds,
countryCode, ageVerified);
return items.stream().filter(item -> {
PolicyDecision decision = decisions.get(item.contentId());
return decision != null && decision.allowed();
}).toList();
} catch (PolicyEvaluationException exception) {
throw new ApiException(HttpStatus.SERVICE_UNAVAILABLE, "POLICY_SERVICE_UNAVAILABLE",
"Policy evaluation is temporarily unavailable", null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClient;
import org.springframework.web.client.RestClientException;

Expand All @@ -15,12 +24,16 @@ public class HttpPolicyEvaluationClient implements PolicyEvaluationClient {

private final RestClient restClient;

private final ExecutorService batchExecutor;

public HttpPolicyEvaluationClient(RestClient.Builder restClientBuilder,
@Value("${policy.service.base-url:http://localhost:8084}") String baseUrl) {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setConnectTimeout((int) Duration.ofSeconds(2).toMillis());
requestFactory.setReadTimeout((int) Duration.ofSeconds(3).toMillis());
this.restClient = restClientBuilder.baseUrl(baseUrl).requestFactory(requestFactory).build();
this.batchExecutor = Executors
.newFixedThreadPool(Math.max(2, Math.min(8, Runtime.getRuntime().availableProcessors())));
}

@Override
Expand All @@ -38,6 +51,36 @@ public PolicyDecision evaluate(String contentId, String countryCode, Boolean age
}
}

@Override
public Map<String, PolicyDecision> evaluateBatch(Collection<String> contentIds, String countryCode,
Boolean ageVerified) {
List<String> uniqueIds = contentIds.stream().filter(Objects::nonNull).distinct().toList();
Map<String, CompletableFuture<PolicyDecision>> futures = new LinkedHashMap<>();
for (String contentId : uniqueIds) {
futures.put(contentId,
CompletableFuture.supplyAsync(() -> evaluate(contentId, countryCode, ageVerified), batchExecutor));
}

Map<String, PolicyDecision> decisions = new LinkedHashMap<>();
for (Map.Entry<String, CompletableFuture<PolicyDecision>> entry : futures.entrySet()) {
try {
decisions.put(entry.getKey(), entry.getValue().join());
} catch (CompletionException exception) {
Throwable cause = exception.getCause();
if (cause instanceof PolicyEvaluationException policyEvaluationException) {
throw policyEvaluationException;
}
throw new PolicyEvaluationException("Policy service batch evaluation failed", cause);
}
}
return decisions;
}

@PreDestroy
void shutdownExecutor() {
batchExecutor.shutdownNow();
}

private record PolicyEvaluateRequest(@Nullable String countryCode, @Nullable Boolean ageVerified) {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
package com.cloudmedia.discovery.policy;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;

public interface PolicyEvaluationClient {

PolicyDecision evaluate(String contentId, String countryCode, Boolean ageVerified);

default Map<String, PolicyDecision> evaluateBatch(Collection<String> contentIds, String countryCode,
Boolean ageVerified) {
Map<String, PolicyDecision> decisions = new LinkedHashMap<>();
for (String contentId : contentIds) {
if (contentId != null && !decisions.containsKey(contentId)) {
decisions.put(contentId, evaluate(contentId, countryCode, ageVerified));
}
}
return decisions;
}
Comment on lines +11 to +20
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sequential HTTP calls may cause high latency on home feed requests.

The default implementation makes N sequential HTTP calls to the policy service (one per unique content ID). For a home feed with 30-50 items, this could add 100-300ms+ of cumulative latency under normal conditions, and much worse under load or partial service degradation.

Consider parallelizing the calls or introducing a true batch endpoint on the policy service:

♻️ Option 1: Parallel execution with CompletableFuture
default Map<String, PolicyDecision> evaluateBatch(Collection<String> contentIds, String countryCode,
        Boolean ageVerified) {
    List<String> uniqueIds = contentIds.stream()
            .filter(Objects::nonNull)
            .distinct()
            .toList();
    
    Map<String, CompletableFuture<PolicyDecision>> futures = new LinkedHashMap<>();
    for (String contentId : uniqueIds) {
        futures.put(contentId, CompletableFuture.supplyAsync(
            () -> evaluate(contentId, countryCode, ageVerified)));
    }
    
    Map<String, PolicyDecision> decisions = new LinkedHashMap<>();
    for (var entry : futures.entrySet()) {
        decisions.put(entry.getKey(), entry.getValue().join());
    }
    return decisions;
}

This requires adding import java.util.concurrent.CompletableFuture;, import java.util.List;, and import java.util.Objects;. You may also want to use a bounded executor to limit concurrent requests.

♻️ Option 2: True batch API (preferred if policy service supports it)

Override evaluateBatch in HttpPolicyEvaluationClient to call a batch endpoint (e.g., POST /v1/policy/content/evaluate-batch) that accepts multiple content IDs in a single request.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@services/java/discovery-service/src/main/java/com/cloudmedia/discovery/policy/PolicyEvaluationClient.java`
around lines 11 - 20, The default evaluateBatch implementation performs
sequential calls to evaluate for each contentId causing high cumulative latency;
update evaluateBatch (or override it in HttpPolicyEvaluationClient) to perform
calls in parallel (e.g., create uniqueIds from contentIds, launch
evaluate(contentId, countryCode, ageVerified) via CompletableFuture/supplyAsync
with a bounded Executor, collect .join() results into the LinkedHashMap) or,
preferably, implement a true batch call in HttpPolicyEvaluationClient that
invokes a policy service batch endpoint (e.g., POST
/v1/policy/content/evaluate-batch) and maps the single response to the returned
Map<String,PolicyDecision>.

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.cloudmedia.discovery.api.discovery;

import com.cloudmedia.discovery.error.ApiException;
import com.cloudmedia.discovery.discovery.FeedSourceBucket;
import com.cloudmedia.discovery.discovery.HomeFeedCandidates;
import com.cloudmedia.discovery.discovery.HomeFeedItem;
Expand All @@ -17,6 +18,7 @@
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.http.HttpStatus;
import org.springframework.test.web.servlet.MockMvc;

import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
Expand Down Expand Up @@ -45,6 +47,19 @@ void homeValidatesSizeLimit() throws Exception {
.andExpect(jsonPath("$.error.code").value("VALIDATION_ERROR"));
}

@Test
void homeValidatesCountryCodeFormat() throws Exception {
mockMvc.perform(get("/v1/discovery/home").param("countryCode", "usa")).andExpect(status().isBadRequest())
.andExpect(jsonPath("$.error.code").value("VALIDATION_ERROR"));
}

@Test
void homeReturnsServiceUnavailableWhenPolicyEvaluationFails() throws Exception {
mockMvc.perform(get("/v1/discovery/home").param("userId", "fail-policy"))
.andExpect(status().isServiceUnavailable())
.andExpect(jsonPath("$.error.code").value("POLICY_SERVICE_UNAVAILABLE"));
}

@TestConfiguration
static class HomeFeedTestConfiguration {

Expand All @@ -66,9 +81,14 @@ public AutocompleteResponse autocomplete(String query, int size) {
public HomeFeedCandidates homeFeed(String userId, int size) {
return HomeFeedCandidates.empty();
}
}) {
}, (contentId, countryCode, ageVerified) -> new com.cloudmedia.discovery.policy.PolicyDecision(true,
List.of())) {
@Override
public HomeFeedResponse homeFeed(String userId, Integer size) {
public HomeFeedResponse homeFeed(String userId, Integer size, String countryCode, Boolean ageVerified) {
if ("fail-policy".equals(userId)) {
throw new ApiException(HttpStatus.SERVICE_UNAVAILABLE, "POLICY_SERVICE_UNAVAILABLE",
"Policy evaluation is temporarily unavailable", null);
}
return new HomeFeedResponse(List.of(new HomeFeedItem("cnt_1", "chn_1", "Title", "Description",
"VIDEO", "PUBLIC", Instant.parse("2026-03-14T12:00:00Z"), FeedSourceBucket.TRENDING)), 2);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.cloudmedia.discovery.discovery;

import com.cloudmedia.discovery.search.AutocompleteResponse;
import com.cloudmedia.discovery.policy.PolicyDecision;
import com.cloudmedia.discovery.policy.PolicyEvaluationClient;
import com.cloudmedia.discovery.search.SearchIndexReader;
import com.cloudmedia.discovery.search.SearchResponse;
import java.time.Instant;
Expand All @@ -14,11 +16,15 @@ class HomeFeedServiceTest {
@Test
void homeFeedUsesDefaultSizeAndBlendsDedupedItems() {
RecordingSearchIndexReader reader = new RecordingSearchIndexReader();
HomeFeedService service = new HomeFeedService(reader);
RecordingPolicyEvaluationClient policyClient = new RecordingPolicyEvaluationClient();
HomeFeedService service = new HomeFeedService(reader, policyClient);

HomeFeedResponse response = service.homeFeed(null, null);
HomeFeedResponse response = service.homeFeed(null, null, "US", true);

assertEquals(20, reader.size);
assertEquals(List.of("follow-1", "trend-1", "trend-1", "similar-1"), policyClient.recordedContentIds);
assertEquals(List.of("US", "US", "US", "US"), policyClient.recordedCountryCodes);
assertEquals(List.of(Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, Boolean.TRUE), policyClient.recordedAgeVerified);
assertEquals(3, response.items().size());
assertEquals(FeedSourceBucket.FOLLOWED, response.items().get(0).sourceBucket());
assertEquals(FeedSourceBucket.TRENDING, response.items().get(1).sourceBucket());
Expand All @@ -27,16 +33,28 @@ void homeFeedUsesDefaultSizeAndBlendsDedupedItems() {
@Test
void homeFeedClampsSizeAndAvoidsDuplicateContentIds() {
RecordingSearchIndexReader reader = new RecordingSearchIndexReader();
HomeFeedService service = new HomeFeedService(reader);
HomeFeedService service = new HomeFeedService(reader, new RecordingPolicyEvaluationClient());

HomeFeedResponse response = service.homeFeed("user-1", 2);
HomeFeedResponse response = service.homeFeed("user-1", 2, null, null);

assertEquals("user-1", reader.userId);
assertEquals(2, response.size());
assertEquals(2, response.items().size());
assertEquals(List.of("follow-1", "trend-1"), response.items().stream().map(HomeFeedItem::contentId).toList());
}

@Test
void homeFeedFiltersPolicyBlockedItems() {
RecordingSearchIndexReader reader = new RecordingSearchIndexReader();
RecordingPolicyEvaluationClient policyClient = new RecordingPolicyEvaluationClient();
policyClient.blockedContentIds = List.of("trend-1");
HomeFeedService service = new HomeFeedService(reader, policyClient);

HomeFeedResponse response = service.homeFeed("user-1", 3, "DE", true);

assertEquals(List.of("follow-1", "similar-1"), response.items().stream().map(HomeFeedItem::contentId).toList());
}

static class RecordingSearchIndexReader implements SearchIndexReader {

private String userId;
Expand Down Expand Up @@ -70,4 +88,24 @@ private HomeFeedItem item(String contentId, FeedSourceBucket bucket) {
Instant.parse("2026-03-14T12:00:00Z"), bucket);
}
}

static class RecordingPolicyEvaluationClient implements PolicyEvaluationClient {

private final List<String> recordedContentIds = new java.util.ArrayList<>();

private final List<String> recordedCountryCodes = new java.util.ArrayList<>();

private final List<Boolean> recordedAgeVerified = new java.util.ArrayList<>();

private List<String> blockedContentIds = List.of();

@Override
public PolicyDecision evaluate(String contentId, String countryCode, Boolean ageVerified) {
recordedContentIds.add(contentId);
recordedCountryCodes.add(countryCode);
recordedAgeVerified.add(ageVerified);
boolean allowed = !blockedContentIds.contains(contentId);
return new PolicyDecision(allowed, allowed ? List.of() : List.of("CONTENT_BLOCKED"));
}
}
}
Loading