Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion horizon/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.3.0
v1.4.0
2 changes: 1 addition & 1 deletion horizon/internal/skye/handler/skye.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (s *skyeConfig) RegisterVariant(request VariantRegisterRequest) (RequestSta
return fmt.Errorf("model with name '%s' does not exist for entity '%s'", variantPayload.Model, variantPayload.Entity)
}

if models.Models[variantPayload.Model].ModelType != enums.ModelType(enums.DELTA) && variantPayload.OTDTrainingDataPath == "" {
if models.Models[variantPayload.Model].ModelType == enums.ModelType(enums.DELTA) && variantPayload.OTDTrainingDataPath == "" {
return fmt.Errorf("otd_training_data_path is required for DELTA model type")
}

Expand Down
35 changes: 28 additions & 7 deletions quick-start/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ services:
# Variants list
- VARIANTS_LIST=ads_gold,ads_mall,ads_new_catalog,ads,ct_gst,ct_high_asp,ct_non_gst,organic_gold,organic,organic_gst,organic_high_asp,organic_mall,organic_melp,organic_non_gst,widget_ads,organic_gst_pd
# MQ configuration
- MQ_ID_TOPICS_MAPPING=2450:skye.embeddings-1
- MQ_ID_TOPICS_MAPPING=2450:skye.embedding
# Horizon to Skye ScyllaDB configuration
- HORIZON_TO_SKYE_SCYLLA_CONF_ID_MAP=2:1
- SCYLLA_2_CONTACT_POINTS=scylla
Expand Down Expand Up @@ -651,9 +651,14 @@ services:
onfs-network:
ipv4_address: 172.18.0.20
depends_on:
- etcd
- kafka
- qdrant
etcd:
condition: service_healthy
qdrant:
condition: service_healthy
kafka:
condition: service_healthy
kafka-init:
condition: service_completed_successfully
restart: unless-stopped

skye-admin-healthcheck:
Expand Down Expand Up @@ -714,18 +719,25 @@ services:
# Etcd
- ETCD_SERVER=etcd:2379
- ETCD_WATCHER_ENABLED=true
# Admin HTTP client (used by embedding consumer for e.g. trigger indexing)
- ADMIN_HOST=skye-admin
- ADMIN_PORT=8092
- ADMIN_TIMEOUT_IN_MS=30000
- ADMIN_MAX_IDLE_CONNS=100
- ADMIN_MAX_IDLE_CONNS_PER_HOST=100
- ADMIN_IDLE_CONN_TIMEOUT_IN_MS=30000
# Embedding consumer (ID=2)
- EMBEDDING_CONSUMER_KAFKA_IDS=2
- KAFKA_2_TOPICS=skye.embedding
- KAFKA_2_BOOTSTRAP_SERVERS=broker:29092
- KAFKA_2_BASIC_AUTH_CREDENTIAL_SOURCE=NONE
- KAFKA_2_GROUP_ID=skye-embedding-consumer
- KAFKA_2_GROUP_ID=skye-embedding-consumer-v2
- KAFKA_2_AUTO_OFFSET_RESET=earliest
- KAFKA_2_AUTO_COMMIT_INTERVAL_MS=5000
- KAFKA_2_ENABLE_AUTO_COMMIT=false
- KAFKA_2_LISTENER_CONCURRENCY=1
- KAFKA_2_CLIENT_ID=skye-embedding-consumer
- KAFKA_2_BATCH_SIZE=10
- KAFKA_2_CLIENT_ID=skye-embedding-consumer-v2
- KAFKA_2_BATCH_SIZE=1
- KAFKA_2_POLL_TIMEOUT=1000
# Embedding sequence consumer (ID=3)
- EMBEDDING_CONSUMER_SEQUENCE_KAFKA_IDS=3
Expand Down Expand Up @@ -776,6 +788,11 @@ services:
- KAFKA_7_CLIENT_ID=skye-realtime-delta-consumer
- KAFKA_7_BATCH_SIZE=10
- KAFKA_7_POLL_TIMEOUT=1000
# Aggregator DB (Scylla) - required for embedding consumer when entity store uses conf_id=1
- STORAGE_AGGREGATOR_DB_COUNT=1
- STORAGE_AGGREGATOR_DB_1_CONTACT_POINTS=scylla
- STORAGE_AGGREGATOR_DB_1_PORT=9042
- STORAGE_AGGREGATOR_DB_1_KEYSPACE=skye
networks:
onfs-network:
ipv4_address: 172.18.0.23
Expand All @@ -788,6 +805,10 @@ services:
condition: service_healthy
kafka-init:
condition: service_completed_successfully
skye-admin:
condition: service_started
scylla:
condition: service_healthy
restart: unless-stopped

skye-consumers-healthcheck:
Expand Down
20 changes: 17 additions & 3 deletions quick-start/stop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,24 @@ remove_containers() {
echo "βœ… Containers removed"
}

clean_skye_etcd() {
echo "🧹 Clearing Skye config from etcd..."
if docker ps -q -f name=^etcd$ | grep -q . 2>/dev/null && docker network ls -q | grep -q "onfs-network" 2>/dev/null; then
if docker run --rm --network onfs-network quay.io/coreos/etcd:v3.5.12 etcdctl --endpoints=http://etcd:2379 del --prefix "/config/skye" 2>/dev/null; then
echo "βœ… Skye etcd keys removed"
else
echo "⚠️ Skye etcd cleanup skipped (etcd not reachable or no keys)"
fi
else
echo "⚠️ etcd not running or network missing, skipping Skye etcd cleanup"
fi
}

remove_volumes() {
echo "πŸ’Ύ Removing persistent volumes..."

# Remove named volumes
VOLUMES=("scylla-data" "mysql-data" "kafka-data")
# Remove named volumes (includes Skye: mysql Horizon/Skye tables, scylla skye keyspace, qdrant)
VOLUMES=("scylla-data" "mysql-data" "kafka-data" "qdrant-data")
for volume in "${VOLUMES[@]}"; do
if docker volume ls -q | grep -q "^${volume}$"; then
echo "πŸ—‘οΈ Removing volume: $volume"
Expand Down Expand Up @@ -120,7 +133,7 @@ show_status() {
fi

# Check for volumes
EXISTING_VOLUMES=$(docker volume ls -q | grep -E "scylla-data|mysql-data|kafka-data" 2>/dev/null || true)
EXISTING_VOLUMES=$(docker volume ls -q | grep -E "scylla-data|mysql-data|kafka-data|qdrant-data" 2>/dev/null || true)
if [ -n "$EXISTING_VOLUMES" ]; then
echo "πŸ’Ύ Existing volumes:"
echo "$EXISTING_VOLUMES" | sed 's/^/ β€’ /'
Expand All @@ -144,6 +157,7 @@ if [ "$PURGE_FLAG" = "--purge" ]; then
echo "⏳ Starting in 3 seconds... (Ctrl+C to cancel)"
sleep 3

clean_skye_etcd
stop_services
remove_containers
remove_images
Expand Down
49 changes: 38 additions & 11 deletions quick-start/test-skye.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
# Skye End-to-End Test Script
# =============================================================================
# Tests the full Skye flow:
# 1. Health checks on all Skye services
# 1. Health checks on all Skye services (admin, serving, qdrant, consumers)
# 2. Register store, frequency, entity, model, variant via skye-admin
# 3. Create Qdrant collection directly
# 4. Insert test vectors into Qdrant
# 5. Query similar candidates via skye-serving (gRPC)
# 5. Send 3 embedding events to Kafka (skye-consumers consume them)
# 6. Verify Qdrant search
# 7. Query similar candidates via skye-serving (gRPC)
# =============================================================================

ADMIN_URL="http://localhost:8092"
SERVING_URL="localhost:8094"
QDRANT_URL="http://localhost:6333"
CONSUMERS_URL="http://localhost:8093"
BROKER_CONTAINER="${BROKER_CONTAINER:-broker}"
SKYE_EMBEDDING_TOPIC="${SKYE_EMBEDDING_TOPIC:-skye.embedding}"

GREEN='\033[0;32m'
RED='\033[0;31m'
Expand Down Expand Up @@ -52,6 +57,7 @@ info "Checking service health..."
curl -sf "${ADMIN_URL}/health" > /dev/null && pass "skye-admin is healthy" || fail "skye-admin is not reachable at ${ADMIN_URL}"
curl -sf "http://${SERVING_URL}/health/self" > /dev/null && pass "skye-serving is healthy" || fail "skye-serving is not reachable at ${SERVING_URL}"
curl -sf "${QDRANT_URL}/healthz" > /dev/null && pass "Qdrant is healthy" || fail "Qdrant is not reachable at ${QDRANT_URL}"
curl -sf "${CONSUMERS_URL}/health" > /dev/null && pass "skye-consumers is healthy" || fail "skye-consumers is not reachable at ${CONSUMERS_URL}"

echo ""

Expand Down Expand Up @@ -94,7 +100,7 @@ admin_post "Register model" "/api/v1/model/register-model" '{
},
"model_type": "RESET",
"kafka_id": 0,
"training_data_path": "",
"training_data_path": "gs://test",
"metadata": {
"entity": "test-products",
"key-type": "product_id"
Expand All @@ -114,8 +120,8 @@ admin_post "Register variant" "/api/v1/model/register-variant" '{
"variant": "v1",
"vector_db_type": "QDRANT",
"vector_db_config": {
"read_host": "qdrant:6334",
"write_host": "qdrant:6334",
"read_host": "172.18.0.3",
"write_host": "172.18.0.3",
"port": "6334",
"http2config": {
"deadline": 5000,
Expand Down Expand Up @@ -182,7 +188,31 @@ echo " Response: ${RESP}"
pass "Test vectors inserted (upserted)"

# ---------------------------------------------------------------------------
# Step 8: Verify vectors via Qdrant search (sanity check)
# Step 8: Send 3 embedding events to Kafka (skye-consumers will consume them)
# ---------------------------------------------------------------------------
info "Sending 3 embedding events to topic ${SKYE_EMBEDDING_TOPIC}..."
# Minimal valid event JSON (one line each) for entity=test-products, model=product-embeddings, vector_dim=4
EVT1='{"candidate_id":"1","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.1,0.2,0.3,0.4],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.1,0.2,0.3,0.4]}}'
EVT2='{"candidate_id":"2","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.2,0.3,0.4,0.5],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.2,0.3,0.4,0.5]}}'
EVT3='{"candidate_id":"3","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.9,0.8,0.7,0.6],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.9,0.8,0.7,0.6]}}'

if docker exec -i "${BROKER_CONTAINER}" /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server broker:29092 --topic "${SKYE_EMBEDDING_TOPIC}" 2>/dev/null <<EOF
${EVT1}
${EVT2}
${EVT3}
EOF
then
pass "Produced 3 messages to ${SKYE_EMBEDDING_TOPIC}"
else
fail "Failed to produce messages (is container ${BROKER_CONTAINER} running?)"
fi

info "Waiting 5s for skye-consumers to process..."
sleep 5
pass "Check skye-consumers logs for 'Processing N embedding events' to confirm consumption"

# ---------------------------------------------------------------------------
# Step 9: Verify vectors via Qdrant search (sanity check)
# ---------------------------------------------------------------------------
info "Verifying Qdrant search works..."
RESP=$(curl -s -X POST "${QDRANT_URL}/collections/${COLLECTION_NAME}/points/search" \
Expand All @@ -196,11 +226,10 @@ echo " Response: ${RESP}"
pass "Qdrant search verified"

# ---------------------------------------------------------------------------
# Step 9: Query via skye-serving gRPC
# Step 10: Query via skye-serving gRPC
# ---------------------------------------------------------------------------
info "Querying similar candidates via skye-serving gRPC..."

# Check if grpcurl is available
if command -v grpcurl &> /dev/null; then
RESP=$(grpcurl -plaintext \
-H "skye-caller-id: test-script" \
Expand All @@ -217,9 +246,7 @@ if command -v grpcurl &> /dev/null; then
echo " Response: ${RESP}"
pass "gRPC query completed"
else
echo -e "${YELLOW} ⚠️ grpcurl not installed. Install it to test gRPC:${NC}"
echo " brew install grpcurl"
echo ""
warn "grpcurl not installed. Install it to test gRPC: brew install grpcurl"
echo " Then run manually:"
echo ' grpcurl -plaintext -H "skye-caller-id: test" -H "skye-auth-token: test" -d '"'"'{"entity":"test-products","modelName":"product-embeddings","variant":"v1","limit":3,"embeddings":[{"embedding":[0.1,0.2,0.3,0.4]}]}'"'"' localhost:8094 SkyeSimilarCandidateService/getSimilarCandidates'
fi
Expand Down
2 changes: 1 addition & 1 deletion skye/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.0.0
v1.1.0
1 change: 1 addition & 0 deletions skye/internal/consumers/listener/embedding.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func ProcessEmbeddingEvents(record []skafka.ConsumerRecord[string, []byte], c *k
"environment", event.Environment})
events = append(events, event)
}
log.Info().Msgf("Processing %d embedding events", len(events))

err := embeddingConsumer.Process(events)
if err != nil {
Expand Down
19 changes: 17 additions & 2 deletions skye/internal/consumers/listener/embedding/embedding.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (e *EmbeddingConsumer) produceFailureEvents(failedEvents []Event) {
continue
}
failureProducerKafkaId := modelConf.FailureProducerKafkaId
if failureProducerKafkaId == 0 {
log.Debug().Msg("Skipping failure topic produce (failure_producer_kafka_id=0)")
continue
}
skafka.InitProducer(failureProducerKafkaId) // idempotent β€” ensures producer exists for this dynamic ID
jsonBytes, err := json.Marshal(failedEvent)
if err != nil {
Expand Down Expand Up @@ -301,15 +305,23 @@ func (e *EmbeddingConsumer) shouldIndex(event Event, variant string, aggregatorD
log.Error().Msgf("Error getting variant config for entity %s, model %s, variant %s: %v", event.Entity, event.Model, variant, err)
return false, err
}
if variantConfig == nil {
return false, fmt.Errorf("variant config is nil for entity %s model %s variant %s", event.Entity, event.Model, variant)
}
aggregatorFilters := variantConfig.Filter
if aggregatorFilters == nil {
return true, nil
}
if aggregatorData == nil {
aggregatorData = make(map[string]interface{})
}
for _, criteria := range aggregatorFilters {
for _, filter := range criteria {
filterData := filter.DefaultValue
if dataValue, exists := aggregatorData[filter.ColumnName]; exists {
filterData = dataValue.(string)
if dataValue, exists := aggregatorData[filter.ColumnName]; exists && dataValue != nil {
if s, ok := dataValue.(string); ok {
filterData = s
}
}
if filterData != filter.FilterValue {
return false, nil
Expand All @@ -325,6 +337,9 @@ func (e *EmbeddingConsumer) preparePayloadIndexMap(event Event, rtColumns map[st
log.Error().Msgf("Error getting variant config for entity %s, model %s, variant %s: %v", event.Entity, event.Model, variant, err)
return nil, err
}
if variantConfig == nil || variantConfig.VectorDbConfig.Payload == nil {
return make(map[string]interface{}), nil
}
variantPayload := variantConfig.VectorDbConfig.Payload
payloadIndexMap := make(map[string]interface{})
for key, variantPayloadValue := range variantPayload {
Expand Down
27 changes: 23 additions & 4 deletions skye/pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ const (
clientId = "client.id"
)

// splitAndTrimTopics splits a comma-separated topic list and trims spaces (e.g. "a, b" -> ["a", "b"]).
func splitAndTrimTopics(topicsStr string) []string {
parts := strings.Split(topicsStr, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
if t := strings.TrimSpace(p); t != "" {
out = append(out, t)
}
}
return out
}

// BatchHandler processes a batch of raw Kafka messages.
// Return nil on success (processBatch will commit); return error to trigger seek-back.
type BatchHandler func(msgs []*kafka.Message, c *kafka.Consumer) error
Expand All @@ -53,6 +65,8 @@ func StartConsumers(kafkaIds string, consumerName string, handler BatchHandler)
log.Error().Err(err).Msgf("Failed to build kafka config for %s (kafkaId=%s)", consumerName, kafkaId)
continue
}
log.Info().Str("topic", cfg.Topics).Str("bootstrap", cfg.BootstrapURLs).Str("group", cfg.GroupID).
Msgf("Starting %s consumer kafkaId=%s (subscribe to topic)", consumerName, kafkaId)
kl := NewKafkaListener(cfg, handler)
kl.Init()
kl.Consume()
Expand Down Expand Up @@ -95,9 +109,13 @@ func (k *KafkaListener) Init() {
if err != nil {
log.Panic().Err(err).Msg("Failed to create Kafka consumer.")
}
err = consumer.SubscribeTopics([]string{k.kafkaConfig.Topics}, nil)
topics := splitAndTrimTopics(k.kafkaConfig.Topics)
if len(topics) == 0 {
topics = []string{strings.TrimSpace(k.kafkaConfig.Topics)}
}
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
log.Panic().Err(err).Msgf("Failed to subscribe to topic %s", k.kafkaConfig.Topics)
log.Panic().Err(err).Msgf("Failed to subscribe to topics %v", topics)
}
k.consumers = append(k.consumers, consumer)
}
Expand Down Expand Up @@ -147,7 +165,7 @@ func (k *KafkaListener) Consume() {

case <-flushTimer.C:
if msgCount > 0 {
log.Debug().Msgf("Processing %d messages due to timeout", msgCount)
log.Info().Int("msgCount", msgCount).Msg("Flushing batch due to timeout")
k.processBatch(consumer, messages)
msgCount = 0
messages = messages[:0]
Expand All @@ -165,12 +183,13 @@ func (k *KafkaListener) Consume() {
"group:" + k.kafkaConfig.GroupID,
"client:" + k.kafkaConfig.ClientID,
})
log.Info().Str("topic", *e.TopicPartition.Topic).Int32("partition", e.TopicPartition.Partition).Msg("Kafka message received")

messages = append(messages, e)
msgCount++

if msgCount == k.kafkaConfig.BatchSize {
log.Debug().Msgf("Processing batch of %d messages", msgCount)
log.Info().Int("msgCount", msgCount).Msg("Processing batch (batch full)")
k.processBatch(consumer, messages)
msgCount = 0
messages = messages[:0]
Expand Down
Loading