diff --git a/services/attendance-service/main.go b/services/attendance-service/main.go index 3324405..566f0fd 100644 --- a/services/attendance-service/main.go +++ b/services/attendance-service/main.go @@ -120,94 +120,102 @@ func consumeEmployeeDeletions(ctx context.Context) { rabbitURL = "amqp://guest:guest@rabbitmq:5672/" } - conn, err := amqp.Dial(rabbitURL) - if err != nil { - log.Printf("Warning: Could not connect to RabbitMQ for deletion consumer (%v)", err) - return - } - defer conn.Close() + backoff := 1 * time.Second + maxBackoff := 30 * time.Second - ch, err := conn.Channel() - if err != nil { - log.Printf("Warning: Could not open channel for deletion consumer (%v)", err) - return - } - defer ch.Close() + for { + select { + case <-ctx.Done(): + return + default: + } - err = ch.ExchangeDeclare( - "notifications_exchange", - "fanout", - true, - false, - false, - false, - nil, - ) - if err != nil { - log.Printf("Warning: Could not declare notifications_exchange (%v)", err) - return - } + conn, err := amqp.Dial(rabbitURL) + if err != nil { + log.Printf("Warning: Could not connect to RabbitMQ for deletion consumer (%v), retrying in %v", err, backoff) + time.Sleep(backoff) + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + continue + } - q, err := ch.QueueDeclare( - "", - false, - false, - true, - false, - nil, - ) - if err != nil { - log.Printf("Warning: Could not declare queue (%v)", err) - return - } + backoff = 1 * time.Second - err = ch.QueueBind( - q.Name, - "", - "notifications_exchange", - false, - nil, - ) - if err != nil { - log.Printf("Warning: Could not bind queue (%v)", err) - return - } + ch, err := conn.Channel() + if err != nil { + log.Printf("Warning: Could not open channel (%v), reconnecting", err) + conn.Close() + time.Sleep(backoff) + continue + } - msgs, err := ch.Consume( - q.Name, - "", - true, - false, - false, - false, - nil, - ) - if err != nil { - log.Printf("Warning: Could not start consuming (%v)", err) - return - } + err = ch.ExchangeDeclare("notifications_exchange", "fanout", true, false, false, false, nil) + if err != nil { + log.Printf("Warning: Could not declare exchange (%v), reconnecting", err) + ch.Close() + conn.Close() + time.Sleep(backoff) + continue + } - log.Printf("Employee deletion consumer started on notifications_exchange") - for { - select { - case <-ctx.Done(): - log.Println("Employee deletion consumer shutting down") - return - case msg, ok := <-msgs: - if !ok { + q, err := ch.QueueDeclare("", false, false, true, false, nil) + if err != nil { + log.Printf("Warning: Could not declare queue (%v), reconnecting", err) + ch.Close() + conn.Close() + time.Sleep(backoff) + continue + } + + err = ch.QueueBind(q.Name, "", "notifications_exchange", false, nil) + if err != nil { + log.Printf("Warning: Could not bind queue (%v), reconnecting", err) + ch.Close() + conn.Close() + time.Sleep(backoff) + continue + } + + msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + if err != nil { + log.Printf("Warning: Could not start consuming (%v), reconnecting", err) + ch.Close() + conn.Close() + time.Sleep(backoff) + continue + } + + log.Printf("Employee deletion consumer started on notifications_exchange") + + consumeLoop: + for { + select { + case <-ctx.Done(): + log.Println("Employee deletion consumer shutting down") + ch.Close() + conn.Close() return - } - var event struct { - Event string `json:"event"` - Email string `json:"email"` - TenantID string `json:"tenant_id"` - } - if err := json.Unmarshal(msg.Body, &event); err != nil { - log.Printf("Warning: Could not parse deletion event (%v)", err) - continue - } - if event.Event == "employee.deleted" { - handleEmployeeDeletion(event.TenantID, event.Email) + case msg, ok := <-msgs: + if !ok { + log.Printf("RabbitMQ connection lost for deletion consumer, reconnecting") + ch.Close() + conn.Close() + break consumeLoop + } + var event struct { + Event string `json:"event"` + Email string `json:"email"` + TenantID string `json:"tenant_id"` + } + if err := json.Unmarshal(msg.Body, &event); err != nil { + log.Printf("Warning: Could not parse deletion event (%v)", err) + continue + } + if event.Event == "employee.deleted" { + handleEmployeeDeletion(event.TenantID, event.Email) + } } } } diff --git a/services/auth-service/package.json b/services/auth-service/package.json index de58ccd..26359d4 100644 --- a/services/auth-service/package.json +++ b/services/auth-service/package.json @@ -26,6 +26,7 @@ "otplib": "^12.0.1", "pg": "^8.20.0", "prom-client": "^15.1.3", + "redis": "^4.6.0", "uuid": "^9.0.1", "xml-crypto": "^6.0.0" } diff --git a/services/lms-service/handlers/learning_analytics.go b/services/lms-service/handlers/learning_analytics.go index 55ec408..b4fd675 100644 --- a/services/lms-service/handlers/learning_analytics.go +++ b/services/lms-service/handlers/learning_analytics.go @@ -2,6 +2,7 @@ package handlers import ( "encoding/json" + "log" "net/http" "github.com/atlas-workforce/lms-service/middleware" @@ -19,35 +20,69 @@ func (h *LearningAnalyticsHandler) Overview(c *fiber.Ctx) error { tenantID := middleware.GetTenant(c) var totalCourses, pubCourses int64 - h.DB.Model(&models.Course{}).Where("tenant_id = ?", tenantID).Count(&totalCourses) - h.DB.Model(&models.Course{}).Where("tenant_id = ? AND status = ?", tenantID, "PUBLISHED").Count(&pubCourses) + if err := h.DB.Model(&models.Course{}).Where("tenant_id = ?", tenantID).Count(&totalCourses).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to query courses"}) + } + if err := h.DB.Model(&models.Course{}).Where("tenant_id = ? AND status = ?", tenantID, "PUBLISHED").Count(&pubCourses).Error; err != nil { + log.Printf("learning_analytics Overview pubCourses: %v", err) + } var totalEnrollments, completedEnrollments, inProgress int64 - h.DB.Model(&models.Enrollment{}).Where("tenant_id = ?", tenantID).Count(&totalEnrollments) - h.DB.Model(&models.Enrollment{}).Where("tenant_id = ? AND status = ?", tenantID, "COMPLETED").Count(&completedEnrollments) - h.DB.Model(&models.Enrollment{}).Where("tenant_id = ? AND status = ?", tenantID, "IN_PROGRESS").Count(&inProgress) + if err := h.DB.Model(&models.Enrollment{}).Where("tenant_id = ?", tenantID).Count(&totalEnrollments).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to query enrollments"}) + } + if err := h.DB.Model(&models.Enrollment{}).Where("tenant_id = ? AND status = ?", tenantID, "COMPLETED").Count(&completedEnrollments).Error; err != nil { + log.Printf("learning_analytics Overview completed: %v", err) + } + if err := h.DB.Model(&models.Enrollment{}).Where("tenant_id = ? AND status = ?", tenantID, "IN_PROGRESS").Count(&inProgress).Error; err != nil { + log.Printf("learning_analytics Overview inProgress: %v", err) + } var totalCerts, activeCerts, expiringCerts int64 - h.DB.Model(&models.Certification{}).Where("tenant_id = ?", tenantID).Count(&totalCerts) - h.DB.Model(&models.Certification{}).Where("tenant_id = ? AND status = ?", tenantID, "ACTIVE").Count(&activeCerts) - h.DB.Raw("SELECT COUNT(*) FROM certifications WHERE tenant_id = ? AND status = 'ACTIVE' AND expiry_date IS NOT NULL AND expiry_date BETWEEN NOW() AND NOW() + INTERVAL '30 days'", tenantID).Scan(&expiringCerts) + if err := h.DB.Model(&models.Certification{}).Where("tenant_id = ?", tenantID).Count(&totalCerts).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to query certifications"}) + } + if err := h.DB.Model(&models.Certification{}).Where("tenant_id = ? AND status = ?", tenantID, "ACTIVE").Count(&activeCerts).Error; err != nil { + log.Printf("learning_analytics Overview activeCerts: %v", err) + } + if err := h.DB.Raw("SELECT COUNT(*) FROM certifications WHERE tenant_id = ? AND status = 'ACTIVE' AND expiry_date IS NOT NULL AND expiry_date BETWEEN NOW() AND NOW() + INTERVAL '30 days'", tenantID).Scan(&expiringCerts).Error; err != nil { + log.Printf("learning_analytics Overview expiring: %v", err) + } var totalAssessments, totalAttempts, passedAttempts int64 - h.DB.Model(&models.Assessment{}).Where("tenant_id = ?", tenantID).Count(&totalAssessments) - h.DB.Model(&models.AssessmentAttempt{}).Where("tenant_id = ?", tenantID).Count(&totalAttempts) - h.DB.Model(&models.AssessmentAttempt{}).Where("tenant_id = ? AND passed = ?", tenantID, true).Count(&passedAttempts) + if err := h.DB.Model(&models.Assessment{}).Where("tenant_id = ?", tenantID).Count(&totalAssessments).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to query assessments"}) + } + if err := h.DB.Model(&models.AssessmentAttempt{}).Where("tenant_id = ?", tenantID).Count(&totalAttempts).Error; err != nil { + log.Printf("learning_analytics Overview totalAttempts: %v", err) + } + if err := h.DB.Model(&models.AssessmentAttempt{}).Where("tenant_id = ? AND passed = ?", tenantID, true).Count(&passedAttempts).Error; err != nil { + log.Printf("learning_analytics Overview passedAttempts: %v", err) + } var totalSkills, totalJourneys int64 - h.DB.Model(&models.SkillMatrix{}).Where("tenant_id = ?", tenantID).Count(&totalSkills) - h.DB.Model(&models.LearningJourney{}).Where("tenant_id = ?", tenantID).Count(&totalJourneys) + if err := h.DB.Model(&models.SkillMatrix{}).Where("tenant_id = ?", tenantID).Count(&totalSkills).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to query skills"}) + } + if err := h.DB.Model(&models.LearningJourney{}).Where("tenant_id = ?", tenantID).Count(&totalJourneys).Error; err != nil { + log.Printf("learning_analytics Overview totalJourneys: %v", err) + } var totalCompliance, completedCompliance int64 - h.DB.Model(&models.ComplianceTraining{}).Where("tenant_id = ?", tenantID).Count(&totalCompliance) - h.DB.Model(&models.ComplianceTraining{}).Where("tenant_id = ? AND status = ?", tenantID, "COMPLETED").Count(&completedCompliance) + if err := h.DB.Model(&models.ComplianceTraining{}).Where("tenant_id = ?", tenantID).Count(&totalCompliance).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to query compliance"}) + } + if err := h.DB.Model(&models.ComplianceTraining{}).Where("tenant_id = ? AND status = ?", tenantID, "COMPLETED").Count(&completedCompliance).Error; err != nil { + log.Printf("learning_analytics Overview completedCompliance: %v", err) + } var totalKnowledge, totalMarketplace int64 - h.DB.Model(&models.KnowledgeArticle{}).Where("tenant_id = ?", tenantID).Count(&totalKnowledge) - h.DB.Model(&models.MarketplaceListing{}).Where("tenant_id = ?", tenantID).Count(&totalMarketplace) + if err := h.DB.Model(&models.KnowledgeArticle{}).Where("tenant_id = ?", tenantID).Count(&totalKnowledge).Error; err != nil { + log.Printf("learning_analytics Overview totalKnowledge: %v", err) + } + if err := h.DB.Model(&models.MarketplaceListing{}).Where("tenant_id = ?", tenantID).Count(&totalMarketplace).Error; err != nil { + log.Printf("learning_analytics Overview totalMarketplace: %v", err) + } completionRate := 0.0 if totalEnrollments > 0 { @@ -84,13 +119,13 @@ func (h *LearningAnalyticsHandler) Department(c *fiber.Ctx) error { tenantID := middleware.GetTenant(c) var deptStats []struct { - Department string `json:"department"` + EmployeeID string `json:"employee_id"` TotalEnrolled int64 `json:"total_enrolled"` Completed int64 `json:"completed"` InProgress int64 `json:"in_progress"` } - h.DB.Raw(` - SELECT e.employee_id as department, + if err := h.DB.Raw(` + SELECT e.employee_id, COUNT(*) as total_enrolled, SUM(CASE WHEN e.status = 'COMPLETED' THEN 1 ELSE 0 END) as completed, SUM(CASE WHEN e.status = 'IN_PROGRESS' THEN 1 ELSE 0 END) as in_progress @@ -98,7 +133,9 @@ func (h *LearningAnalyticsHandler) Department(c *fiber.Ctx) error { WHERE e.tenant_id = ? GROUP BY e.employee_id ORDER BY total_enrolled DESC - LIMIT 10`, tenantID).Scan(&deptStats) + LIMIT 10`, tenantID).Scan(&deptStats).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to fetch department stats"}) + } return c.JSON(fiber.Map{"data": deptStats}) } @@ -117,14 +154,16 @@ func (h *LearningAnalyticsHandler) Trends(c *fiber.Ctx) error { trunc = "DATE_TRUNC('week', created_at)::date" } - h.DB.Raw(` + if err := h.DB.Raw(` SELECT `+trunc+` as period, COUNT(*) as enrollments, SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) as completions FROM enrollments WHERE tenant_id = ? AND created_at > NOW() - INTERVAL '12 months' GROUP BY period - ORDER BY period ASC`, tenantID).Scan(&trends) + ORDER BY period ASC`, tenantID).Scan(&trends).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to fetch trends"}) + } return c.JSON(fiber.Map{"data": trends}) } @@ -133,10 +172,14 @@ func (h *LearningAnalyticsHandler) CompetencyMatrix(c *fiber.Ctx) error { tenantID := middleware.GetTenant(c) var frameworks []models.CompetencyFramework - h.DB.Where("tenant_id = ? AND status = ?", tenantID, "ACTIVE").Find(&frameworks) + if err := h.DB.Where("tenant_id = ? AND status = ?", tenantID, "ACTIVE").Find(&frameworks).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to fetch frameworks"}) + } var allSkills []models.SkillMatrix - h.DB.Where("tenant_id = ?", tenantID).Find(&allSkills) + if err := h.DB.Where("tenant_id = ?", tenantID).Find(&allSkills).Error; err != nil { + return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to fetch skills"}) + } empSkillMap := make(map[string]map[string]models.SkillMatrix) for _, s := range allSkills { @@ -244,8 +287,10 @@ func (h *SkillEndorsementHandler) Create(c *fiber.Ctx) error { return c.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to create endorsement"}) } if req.SkillID != "" { - h.DB.Model(&models.SkillMatrix{}).Where("id = ? AND tenant_id = ?", req.SkillID, tenantID). - Update("endorsed_by", req.EndorsedBy) + if err := h.DB.Model(&models.SkillMatrix{}).Where("id = ? AND tenant_id = ?", req.SkillID, tenantID). + Update("endorsed_by", req.EndorsedBy).Error; err != nil { + log.Printf("SkillEndorsementHandler.Create update skill matrix: %v", err) + } } return c.Status(http.StatusCreated).JSON(fiber.Map{"data": endorsement}) } diff --git a/services/notification-go-service/main.go b/services/notification-go-service/main.go index c2a6e44..c5a17a5 100644 --- a/services/notification-go-service/main.go +++ b/services/notification-go-service/main.go @@ -88,6 +88,7 @@ type Client struct { conn *websocket.Conn send chan []byte tenantID string + clientIP string } type BroadcastMessage struct { @@ -141,11 +142,16 @@ func (s *NotificationStore) MarkRead(ids []string) { } } +const maxClients = 1000 +const maxConnsPerIP = 10 + var ( - store = &NotificationStore{} - clients = make(map[*Client]bool) - broadcast = make(chan BroadcastMessage) - mutex = &sync.Mutex{} + store = &NotificationStore{} + clients = make(map[*Client]bool) + broadcast = make(chan BroadcastMessage) + mutex = &sync.Mutex{} + ipConnections = make(map[string]int) + ipMutex = &sync.Mutex{} ) type wsClaims struct { @@ -257,6 +263,7 @@ func (c *Client) readPump() { delete(clients, c) close(c.send) mutex.Unlock() + releaseConnection(c.clientIP) c.conn.Close() }() @@ -279,6 +286,26 @@ func healthHandler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(map[string]string{"status": "Notification Service is running"}) } +func canAcceptConnection(ip string) bool { + ipMutex.Lock() + defer ipMutex.Unlock() + count := ipConnections[ip] + if count >= maxConnsPerIP { + return false + } + ipConnections[ip] = count + 1 + return true +} + +func releaseConnection(ip string) { + ipMutex.Lock() + defer ipMutex.Unlock() + ipConnections[ip]-- + if ipConnections[ip] <= 0 { + delete(ipConnections, ip) + } +} + func handleConnections(w http.ResponseWriter, r *http.Request) { tokenStr := r.URL.Query().Get("token") if tokenStr == "" { @@ -297,13 +324,33 @@ func handleConnections(w http.ResponseWriter, r *http.Request) { tenantID = "default" } + clientIP := r.RemoteAddr + if idx := strings.LastIndex(clientIP, ":"); idx != -1 { + clientIP = clientIP[:idx] + } + + if !canAcceptConnection(clientIP) { + http.Error(w, `{"error":"Too many connections from this IP"}`, http.StatusTooManyRequests) + return + } + + mutex.Lock() + if len(clients) >= maxClients { + mutex.Unlock() + releaseConnection(clientIP) + http.Error(w, `{"error":"Server at maximum connection capacity"}`, http.StatusServiceUnavailable) + return + } + mutex.Unlock() + ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket Upgrade Error: %v", err) + releaseConnection(clientIP) return } - client := &Client{conn: ws, send: make(chan []byte, 256), tenantID: tenantID} + client := &Client{conn: ws, send: make(chan []byte, 256), tenantID: tenantID, clientIP: clientIP} mutex.Lock() clients[client] = true @@ -336,9 +383,10 @@ func handleMessages(ctx context.Context) { // REST API handlers func listNotificationsHandler(w http.ResponseWriter, r *http.Request) { - tenantID := r.URL.Query().Get("tenant_id") + tenantID := r.Header.Get("X-Tenant-Id") if tenantID == "" { - tenantID = r.Header.Get("X-Tenant-Id") + http.Error(w, `{"error":"Missing tenant context"}`, http.StatusForbidden) + return } notifications := store.List(tenantID) w.Header().Set("Content-Type", "application/json") @@ -368,93 +416,92 @@ func setupRabbitMQConsumer(ctx context.Context) { rabbitURL = "amqp://guest:guest@rabbitmq:5672/" } - var conn *amqp.Connection - var err error + backoff := 1 * time.Second + maxBackoff := 30 * time.Second - for i := 0; i < 5; i++ { - conn, err = amqp.Dial(rabbitURL) - if err == nil { - break + for { + select { + case <-ctx.Done(): + return + default: } - log.Printf("Failed to connect to RabbitMQ, retrying... (%v)", err) - time.Sleep(5 * time.Second) - } - if err != nil { - log.Printf("Could not connect to RabbitMQ: %v", err) - return - } + conn, err := amqp.Dial(rabbitURL) + if err != nil { + log.Printf("Failed to connect to RabbitMQ (%v), retrying in %v", err, backoff) + time.Sleep(backoff) + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + continue + } - ch, err := conn.Channel() - if err != nil { - log.Printf("Failed to open a channel: %v", err) - return - } + backoff = 1 * time.Second - err = ch.ExchangeDeclare( - "notifications_exchange", - "fanout", - true, - false, - false, - false, - nil, - ) - if err != nil { - log.Printf("Failed to declare an exchange: %v", err) - return - } + ch, err := conn.Channel() + if err != nil { + log.Printf("Failed to open channel (%v), reconnecting", err) + conn.Close() + time.Sleep(backoff) + continue + } - q, err := ch.QueueDeclare( - "", // empty name generates a unique temporary queue name - false, // non-durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Printf("Failed to declare a queue: %v", err) - return - } + err = ch.ExchangeDeclare("notifications_exchange", "fanout", true, false, false, false, nil) + if err != nil { + log.Printf("Failed to declare exchange (%v), reconnecting", err) + ch.Close() + conn.Close() + time.Sleep(backoff) + continue + } - err = ch.QueueBind( - q.Name, - "", - "notifications_exchange", - false, - nil, - ) - if err != nil { - log.Printf("Failed to bind queue: %v", err) - return - } + q, err := ch.QueueDeclare("", false, false, true, false, nil) + if err != nil { + log.Printf("Failed to declare queue (%v), reconnecting", err) + ch.Close() + conn.Close() + time.Sleep(backoff) + continue + } - msgs, err := ch.Consume( - q.Name, - "", - true, - false, - false, - false, - nil, - ) - if err != nil { - log.Printf("Failed to register a consumer: %v", err) - return - } + err = ch.QueueBind(q.Name, "", "notifications_exchange", false, nil) + if err != nil { + log.Printf("Failed to bind queue (%v), reconnecting", err) + ch.Close() + conn.Close() + time.Sleep(backoff) + continue + } - log.Println("Waiting for messages from RabbitMQ...") - for { - select { - case <-ctx.Done(): - log.Println("RabbitMQ consumer shutting down") - return - case d, ok := <-msgs: - if !ok { + msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + if err != nil { + log.Printf("Failed to register consumer (%v), reconnecting", err) + ch.Close() + conn.Close() + time.Sleep(backoff) + continue + } + + log.Println("Waiting for messages from RabbitMQ...") + + consumeLoop: + for { + select { + case <-ctx.Done(): + log.Println("RabbitMQ consumer shutting down") + ch.Close() + conn.Close() return + case d, ok := <-msgs: + if !ok { + log.Printf("RabbitMQ connection lost, reconnecting") + ch.Close() + conn.Close() + break consumeLoop + } + processMessage(d.Body) } - processMessage(d.Body) } } }