From 04aa54f6fae7c65ea268ec2547c1767875549535 Mon Sep 17 00:00:00 2001 From: ansh-meesho Date: Mon, 16 Feb 2026 15:12:32 +0530 Subject: [PATCH 1/3] Fixed the consumer flow for skye --- quick-start/docker-compose.yml | 35 +++++++++++++++++++----- quick-start/stop.sh | 20 +++++++++++--- quick-start/test-skye.sh | 49 ++++++++++++++++++++++++++-------- 3 files changed, 83 insertions(+), 21 deletions(-) diff --git a/quick-start/docker-compose.yml b/quick-start/docker-compose.yml index 6bff1db4..799fce7e 100644 --- a/quick-start/docker-compose.yml +++ b/quick-start/docker-compose.yml @@ -461,7 +461,7 @@ services: # Variants list - VARIANTS_LIST=ads_gold,ads_mall,ads_new_catalog,ads,ct_gst,ct_high_asp,ct_non_gst,organic_gold,organic,organic_gst,organic_high_asp,organic_mall,organic_melp,organic_non_gst,widget_ads,organic_gst_pd # MQ configuration - - MQ_ID_TOPICS_MAPPING=2450:skye.embeddings-1 + - MQ_ID_TOPICS_MAPPING=2450:skye.embedding # Horizon to Skye ScyllaDB configuration - HORIZON_TO_SKYE_SCYLLA_CONF_ID_MAP=2:1 - SCYLLA_2_CONTACT_POINTS=scylla @@ -651,9 +651,14 @@ services: onfs-network: ipv4_address: 172.18.0.20 depends_on: - - etcd - - kafka - - qdrant + etcd: + condition: service_healthy + qdrant: + condition: service_healthy + kafka: + condition: service_healthy + kafka-init: + condition: service_completed_successfully restart: unless-stopped skye-admin-healthcheck: @@ -714,18 +719,25 @@ services: # Etcd - ETCD_SERVER=etcd:2379 - ETCD_WATCHER_ENABLED=true + # Admin HTTP client (used by embedding consumer for e.g. trigger indexing) + - ADMIN_HOST=skye-admin + - ADMIN_PORT=8092 + - ADMIN_TIMEOUT_IN_MS=30000 + - ADMIN_MAX_IDLE_CONNS=100 + - ADMIN_MAX_IDLE_CONNS_PER_HOST=100 + - ADMIN_IDLE_CONN_TIMEOUT_IN_MS=30000 # Embedding consumer (ID=2) - EMBEDDING_CONSUMER_KAFKA_IDS=2 - KAFKA_2_TOPICS=skye.embedding - KAFKA_2_BOOTSTRAP_SERVERS=broker:29092 - KAFKA_2_BASIC_AUTH_CREDENTIAL_SOURCE=NONE - - KAFKA_2_GROUP_ID=skye-embedding-consumer + - KAFKA_2_GROUP_ID=skye-embedding-consumer-v2 - KAFKA_2_AUTO_OFFSET_RESET=earliest - KAFKA_2_AUTO_COMMIT_INTERVAL_MS=5000 - KAFKA_2_ENABLE_AUTO_COMMIT=false - KAFKA_2_LISTENER_CONCURRENCY=1 - - KAFKA_2_CLIENT_ID=skye-embedding-consumer - - KAFKA_2_BATCH_SIZE=10 + - KAFKA_2_CLIENT_ID=skye-embedding-consumer-v2 + - KAFKA_2_BATCH_SIZE=1 - KAFKA_2_POLL_TIMEOUT=1000 # Embedding sequence consumer (ID=3) - EMBEDDING_CONSUMER_SEQUENCE_KAFKA_IDS=3 @@ -776,6 +788,11 @@ services: - KAFKA_7_CLIENT_ID=skye-realtime-delta-consumer - KAFKA_7_BATCH_SIZE=10 - KAFKA_7_POLL_TIMEOUT=1000 + # Aggregator DB (Scylla) - required for embedding consumer when entity store uses conf_id=1 + - STORAGE_AGGREGATOR_DB_COUNT=1 + - STORAGE_AGGREGATOR_DB_1_CONTACT_POINTS=scylla + - STORAGE_AGGREGATOR_DB_1_PORT=9042 + - STORAGE_AGGREGATOR_DB_1_KEYSPACE=skye networks: onfs-network: ipv4_address: 172.18.0.23 @@ -788,6 +805,10 @@ services: condition: service_healthy kafka-init: condition: service_completed_successfully + skye-admin: + condition: service_started + scylla: + condition: service_healthy restart: unless-stopped skye-consumers-healthcheck: diff --git a/quick-start/stop.sh b/quick-start/stop.sh index 2e8dac69..300dbf86 100755 --- a/quick-start/stop.sh +++ b/quick-start/stop.sh @@ -48,11 +48,24 @@ remove_containers() { echo "โœ… Containers removed" } +clean_skye_etcd() { + echo "๐Ÿงน Clearing Skye config from etcd..." + if docker ps -q -f name=^etcd$ | grep -q . 2>/dev/null && docker network ls -q | grep -q "onfs-network" 2>/dev/null; then + if docker run --rm --network onfs-network quay.io/coreos/etcd:v3.5.12 etcdctl --endpoints=http://etcd:2379 del --prefix "/config/skye" 2>/dev/null; then + echo "โœ… Skye etcd keys removed" + else + echo "โš ๏ธ Skye etcd cleanup skipped (etcd not reachable or no keys)" + fi + else + echo "โš ๏ธ etcd not running or network missing, skipping Skye etcd cleanup" + fi +} + remove_volumes() { echo "๐Ÿ’พ Removing persistent volumes..." - # Remove named volumes - VOLUMES=("scylla-data" "mysql-data" "kafka-data") + # Remove named volumes (includes Skye: mysql Horizon/Skye tables, scylla skye keyspace, qdrant) + VOLUMES=("scylla-data" "mysql-data" "kafka-data" "qdrant-data") for volume in "${VOLUMES[@]}"; do if docker volume ls -q | grep -q "^${volume}$"; then echo "๐Ÿ—‘๏ธ Removing volume: $volume" @@ -120,7 +133,7 @@ show_status() { fi # Check for volumes - EXISTING_VOLUMES=$(docker volume ls -q | grep -E "scylla-data|mysql-data|kafka-data" 2>/dev/null || true) + EXISTING_VOLUMES=$(docker volume ls -q | grep -E "scylla-data|mysql-data|kafka-data|qdrant-data" 2>/dev/null || true) if [ -n "$EXISTING_VOLUMES" ]; then echo "๐Ÿ’พ Existing volumes:" echo "$EXISTING_VOLUMES" | sed 's/^/ โ€ข /' @@ -144,6 +157,7 @@ if [ "$PURGE_FLAG" = "--purge" ]; then echo "โณ Starting in 3 seconds... (Ctrl+C to cancel)" sleep 3 + clean_skye_etcd stop_services remove_containers remove_images diff --git a/quick-start/test-skye.sh b/quick-start/test-skye.sh index 6190370c..9c5fe0ac 100755 --- a/quick-start/test-skye.sh +++ b/quick-start/test-skye.sh @@ -3,16 +3,21 @@ # Skye End-to-End Test Script # ============================================================================= # Tests the full Skye flow: -# 1. Health checks on all Skye services +# 1. Health checks on all Skye services (admin, serving, qdrant, consumers) # 2. Register store, frequency, entity, model, variant via skye-admin # 3. Create Qdrant collection directly # 4. Insert test vectors into Qdrant -# 5. Query similar candidates via skye-serving (gRPC) +# 5. Send 3 embedding events to Kafka (skye-consumers consume them) +# 6. Verify Qdrant search +# 7. Query similar candidates via skye-serving (gRPC) # ============================================================================= ADMIN_URL="http://localhost:8092" SERVING_URL="localhost:8094" QDRANT_URL="http://localhost:6333" +CONSUMERS_URL="http://localhost:8093" +BROKER_CONTAINER="${BROKER_CONTAINER:-broker}" +SKYE_EMBEDDING_TOPIC="${SKYE_EMBEDDING_TOPIC:-skye.embedding}" GREEN='\033[0;32m' RED='\033[0;31m' @@ -52,6 +57,7 @@ info "Checking service health..." curl -sf "${ADMIN_URL}/health" > /dev/null && pass "skye-admin is healthy" || fail "skye-admin is not reachable at ${ADMIN_URL}" curl -sf "http://${SERVING_URL}/health/self" > /dev/null && pass "skye-serving is healthy" || fail "skye-serving is not reachable at ${SERVING_URL}" curl -sf "${QDRANT_URL}/healthz" > /dev/null && pass "Qdrant is healthy" || fail "Qdrant is not reachable at ${QDRANT_URL}" +curl -sf "${CONSUMERS_URL}/health" > /dev/null && pass "skye-consumers is healthy" || fail "skye-consumers is not reachable at ${CONSUMERS_URL}" echo "" @@ -94,7 +100,7 @@ admin_post "Register model" "/api/v1/model/register-model" '{ }, "model_type": "RESET", "kafka_id": 0, - "training_data_path": "", + "training_data_path": "gs://test", "metadata": { "entity": "test-products", "key-type": "product_id" @@ -114,8 +120,8 @@ admin_post "Register variant" "/api/v1/model/register-variant" '{ "variant": "v1", "vector_db_type": "QDRANT", "vector_db_config": { - "read_host": "qdrant:6334", - "write_host": "qdrant:6334", + "read_host": "172.18.0.3", + "write_host": "172.18.0.3", "port": "6334", "http2config": { "deadline": 5000, @@ -182,7 +188,31 @@ echo " Response: ${RESP}" pass "Test vectors inserted (upserted)" # --------------------------------------------------------------------------- -# Step 8: Verify vectors via Qdrant search (sanity check) +# Step 8: Send 3 embedding events to Kafka (skye-consumers will consume them) +# --------------------------------------------------------------------------- +info "Sending 3 embedding events to topic ${SKYE_EMBEDDING_TOPIC}..." +# Minimal valid event JSON (one line each) for entity=test-products, model=product-embeddings, vector_dim=4 +EVT1='{"candidate_id":"1","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.1,0.2,0.3,0.4],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.1,0.2,0.3,0.4]}}' +EVT2='{"candidate_id":"2","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.2,0.3,0.4,0.5],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.2,0.3,0.4,0.5]}}' +EVT3='{"candidate_id":"3","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.9,0.8,0.7,0.6],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.9,0.8,0.7,0.6]}}' + +if docker exec -i "${BROKER_CONTAINER}" /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server broker:29092 --topic "${SKYE_EMBEDDING_TOPIC}" 2>/dev/null < /dev/null; then RESP=$(grpcurl -plaintext \ -H "skye-caller-id: test-script" \ @@ -217,9 +246,7 @@ if command -v grpcurl &> /dev/null; then echo " Response: ${RESP}" pass "gRPC query completed" else - echo -e "${YELLOW} โš ๏ธ grpcurl not installed. Install it to test gRPC:${NC}" - echo " brew install grpcurl" - echo "" + warn "grpcurl not installed. Install it to test gRPC: brew install grpcurl" echo " Then run manually:" echo ' grpcurl -plaintext -H "skye-caller-id: test" -H "skye-auth-token: test" -d '"'"'{"entity":"test-products","modelName":"product-embeddings","variant":"v1","limit":3,"embeddings":[{"embedding":[0.1,0.2,0.3,0.4]}]}'"'"' localhost:8094 SkyeSimilarCandidateService/getSimilarCandidates' fi From 6f7e7285897ab5897442c7176812906c64300560 Mon Sep 17 00:00:00 2001 From: ansh-meesho Date: Mon, 16 Feb 2026 15:12:49 +0530 Subject: [PATCH 2/3] Fixed the consumer flow for skye --- horizon/internal/skye/handler/skye.go | 2 +- skye/internal/consumers/listener/embedding.go | 1 + .../consumers/listener/embedding/embedding.go | 19 +++++++++++-- skye/pkg/kafka/kafka.go | 27 ++++++++++++++++--- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/horizon/internal/skye/handler/skye.go b/horizon/internal/skye/handler/skye.go index 6f1a244a..572daac6 100644 --- a/horizon/internal/skye/handler/skye.go +++ b/horizon/internal/skye/handler/skye.go @@ -469,7 +469,7 @@ func (s *skyeConfig) RegisterVariant(request VariantRegisterRequest) (RequestSta return fmt.Errorf("model with name '%s' does not exist for entity '%s'", variantPayload.Model, variantPayload.Entity) } - if models.Models[variantPayload.Model].ModelType != enums.ModelType(enums.DELTA) && variantPayload.OTDTrainingDataPath == "" { + if models.Models[variantPayload.Model].ModelType == enums.ModelType(enums.DELTA) && variantPayload.OTDTrainingDataPath == "" { return fmt.Errorf("otd_training_data_path is required for DELTA model type") } diff --git a/skye/internal/consumers/listener/embedding.go b/skye/internal/consumers/listener/embedding.go index 6f5bbee6..8f8edf0d 100644 --- a/skye/internal/consumers/listener/embedding.go +++ b/skye/internal/consumers/listener/embedding.go @@ -28,6 +28,7 @@ func ProcessEmbeddingEvents(record []skafka.ConsumerRecord[string, []byte], c *k "environment", event.Environment}) events = append(events, event) } + log.Info().Msgf("Processing %d embedding events", len(events)) err := embeddingConsumer.Process(events) if err != nil { diff --git a/skye/internal/consumers/listener/embedding/embedding.go b/skye/internal/consumers/listener/embedding/embedding.go index 4dacab7c..068723e9 100644 --- a/skye/internal/consumers/listener/embedding/embedding.go +++ b/skye/internal/consumers/listener/embedding/embedding.go @@ -65,6 +65,10 @@ func (e *EmbeddingConsumer) produceFailureEvents(failedEvents []Event) { continue } failureProducerKafkaId := modelConf.FailureProducerKafkaId + if failureProducerKafkaId == 0 { + log.Debug().Msg("Skipping failure topic produce (failure_producer_kafka_id=0)") + continue + } skafka.InitProducer(failureProducerKafkaId) // idempotent โ€” ensures producer exists for this dynamic ID jsonBytes, err := json.Marshal(failedEvent) if err != nil { @@ -301,15 +305,23 @@ func (e *EmbeddingConsumer) shouldIndex(event Event, variant string, aggregatorD log.Error().Msgf("Error getting variant config for entity %s, model %s, variant %s: %v", event.Entity, event.Model, variant, err) return false, err } + if variantConfig == nil { + return false, fmt.Errorf("variant config is nil for entity %s model %s variant %s", event.Entity, event.Model, variant) + } aggregatorFilters := variantConfig.Filter if aggregatorFilters == nil { return true, nil } + if aggregatorData == nil { + aggregatorData = make(map[string]interface{}) + } for _, criteria := range aggregatorFilters { for _, filter := range criteria { filterData := filter.DefaultValue - if dataValue, exists := aggregatorData[filter.ColumnName]; exists { - filterData = dataValue.(string) + if dataValue, exists := aggregatorData[filter.ColumnName]; exists && dataValue != nil { + if s, ok := dataValue.(string); ok { + filterData = s + } } if filterData != filter.FilterValue { return false, nil @@ -325,6 +337,9 @@ func (e *EmbeddingConsumer) preparePayloadIndexMap(event Event, rtColumns map[st log.Error().Msgf("Error getting variant config for entity %s, model %s, variant %s: %v", event.Entity, event.Model, variant, err) return nil, err } + if variantConfig == nil || variantConfig.VectorDbConfig.Payload == nil { + return make(map[string]interface{}), nil + } variantPayload := variantConfig.VectorDbConfig.Payload payloadIndexMap := make(map[string]interface{}) for key, variantPayloadValue := range variantPayload { diff --git a/skye/pkg/kafka/kafka.go b/skye/pkg/kafka/kafka.go index 4da868d9..465979a0 100644 --- a/skye/pkg/kafka/kafka.go +++ b/skye/pkg/kafka/kafka.go @@ -28,6 +28,18 @@ const ( clientId = "client.id" ) +// splitAndTrimTopics splits a comma-separated topic list and trims spaces (e.g. "a, b" -> ["a", "b"]). +func splitAndTrimTopics(topicsStr string) []string { + parts := strings.Split(topicsStr, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + if t := strings.TrimSpace(p); t != "" { + out = append(out, t) + } + } + return out +} + // BatchHandler processes a batch of raw Kafka messages. // Return nil on success (processBatch will commit); return error to trigger seek-back. type BatchHandler func(msgs []*kafka.Message, c *kafka.Consumer) error @@ -53,6 +65,8 @@ func StartConsumers(kafkaIds string, consumerName string, handler BatchHandler) log.Error().Err(err).Msgf("Failed to build kafka config for %s (kafkaId=%s)", consumerName, kafkaId) continue } + log.Info().Str("topic", cfg.Topics).Str("bootstrap", cfg.BootstrapURLs).Str("group", cfg.GroupID). + Msgf("Starting %s consumer kafkaId=%s (subscribe to topic)", consumerName, kafkaId) kl := NewKafkaListener(cfg, handler) kl.Init() kl.Consume() @@ -95,9 +109,13 @@ func (k *KafkaListener) Init() { if err != nil { log.Panic().Err(err).Msg("Failed to create Kafka consumer.") } - err = consumer.SubscribeTopics([]string{k.kafkaConfig.Topics}, nil) + topics := splitAndTrimTopics(k.kafkaConfig.Topics) + if len(topics) == 0 { + topics = []string{strings.TrimSpace(k.kafkaConfig.Topics)} + } + err = consumer.SubscribeTopics(topics, nil) if err != nil { - log.Panic().Err(err).Msgf("Failed to subscribe to topic %s", k.kafkaConfig.Topics) + log.Panic().Err(err).Msgf("Failed to subscribe to topics %v", topics) } k.consumers = append(k.consumers, consumer) } @@ -147,7 +165,7 @@ func (k *KafkaListener) Consume() { case <-flushTimer.C: if msgCount > 0 { - log.Debug().Msgf("Processing %d messages due to timeout", msgCount) + log.Info().Int("msgCount", msgCount).Msg("Flushing batch due to timeout") k.processBatch(consumer, messages) msgCount = 0 messages = messages[:0] @@ -165,12 +183,13 @@ func (k *KafkaListener) Consume() { "group:" + k.kafkaConfig.GroupID, "client:" + k.kafkaConfig.ClientID, }) + log.Info().Str("topic", *e.TopicPartition.Topic).Int32("partition", e.TopicPartition.Partition).Msg("Kafka message received") messages = append(messages, e) msgCount++ if msgCount == k.kafkaConfig.BatchSize { - log.Debug().Msgf("Processing batch of %d messages", msgCount) + log.Info().Int("msgCount", msgCount).Msg("Processing batch (batch full)") k.processBatch(consumer, messages) msgCount = 0 messages = messages[:0] From 87df7848ccd483450ceec6fc3364f715202d1ae8 Mon Sep 17 00:00:00 2001 From: ansh-meesho Date: Mon, 16 Feb 2026 21:09:31 +0530 Subject: [PATCH 3/3] Bumped the version --- horizon/VERSION | 2 +- skye/VERSION | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/horizon/VERSION b/horizon/VERSION index 18fa8e74..0d0c52f8 100644 --- a/horizon/VERSION +++ b/horizon/VERSION @@ -1 +1 @@ -v1.3.0 +v1.4.0 diff --git a/skye/VERSION b/skye/VERSION index 60453e69..992977ad 100644 --- a/skye/VERSION +++ b/skye/VERSION @@ -1 +1 @@ -v1.0.0 \ No newline at end of file +v1.1.0 \ No newline at end of file