From 1a7429c7f8aad7bd9b65a72b079c4c7f63fdd9bd Mon Sep 17 00:00:00 2001 From: Jack Ding Date: Thu, 9 Apr 2026 14:17:10 -0400 Subject: [PATCH] Fix EventOutCh deadlock by making publish non-blocking PublishEventViaAPI sends to EventOutCh with a blocking channel send. When ProcessOutChannel (the consumer) is slow due to HTTP POST failures (e.g., unreachable subscribers), the 100-slot buffer fills up and the next publish blocks forever. This deadlocks the processMessages goroutine, freezing socket I/O and causing cloud-event-proxy to hang silently. Fix by using select with default to drop events when the channel is full instead of blocking indefinitely. Dropped events are logged as warnings and counted as failures in metrics. Signed-off-by: Jack Ding --- cmd/main.go | 4 +- pkg/common/common.go | 29 +++++++++++--- pkg/common/common_test.go | 43 ++++++++++++++++++++- plugins/ptp_operator/metrics/manager.go | 3 +- plugins/ptp_operator/ptp_operator_plugin.go | 14 ++++--- 5 files changed, 79 insertions(+), 14 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 9663e6fe..00f411a8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -417,7 +417,9 @@ func ProcessInChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) { out.Status = channel.SUCCESS } } - scConfig.EventOutCh <- &out + if !common.SendToChannel(scConfig.EventOutCh, &out) { + log.Warningf("EventOutCh full, dropping ack for %s", d.Address) + } } else if d.Type == channel.STATUS && d.Status == channel.NEW { log.Warnf("event disabled,no action taken(can't send to a destination): logging new status check %v\n", d) out := channel.DataChan{ diff --git a/pkg/common/common.go b/pkg/common/common.go index ffd73c73..cb3adc30 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -45,6 +45,20 @@ import ( log "github.com/sirupsen/logrus" ) +const sendTimeout = 5 * time.Second + +// SendToChannel sends data to a channel with a timeout. +// Returns true if the send succeeded, false if the channel was full for the +// entire timeout duration. +func SendToChannel(ch chan *channel.DataChan, d *channel.DataChan) bool { + select { + case ch <- d: + return true + case <-time.After(sendTimeout): + return false + } +} + // TransportType defines transport type supported type TransportType int @@ -309,18 +323,21 @@ func PublishEvent(scConfig *SCConfiguration, e ceevent.Event) error { // PublishEventViaAPI ... publish events by not using http request but direct api func PublishEventViaAPI(scConfig *SCConfiguration, cneEvent ceevent.Event, resourceAddress string) error { if ceEvent, err := GetPublishingCloudEvent(scConfig, cneEvent); err == nil { - scConfig.EventOutCh <- &channel.DataChan{ + d := &channel.DataChan{ Type: channel.EVENT, Status: channel.NEW, Data: ceEvent, Address: resourceAddress, // this is the publishing address ClientID: scConfig.ClientID(), } - - log.Debugf("event source %s sent to queue to process", ceEvent.Source()) - log.Debugf("event sent %s", cneEvent.JSONString()) - - localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1) + if SendToChannel(scConfig.EventOutCh, d) { + log.Debugf("event source %s sent to queue to process", ceEvent.Source()) + log.Debugf("event sent %s", cneEvent.JSONString()) + localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1) + } else { + log.Warningf("EventOutCh full for %s, dropping event for %s", sendTimeout, resourceAddress) + localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.FAIL, 1) + } } return nil } diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 2e5785bf..b4bc649b 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -17,9 +17,12 @@ package common_test import ( "net" "testing" + "time" + "github.com/redhat-cne/sdk-go/pkg/channel" + ceevent "github.com/redhat-cne/sdk-go/pkg/event" "github.com/redhat-cne/sdk-go/pkg/types" - + v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub" log "github.com/sirupsen/logrus" "github.com/redhat-cne/cloud-event-proxy/pkg/common" @@ -61,3 +64,41 @@ func TestTransportHost_ParseTransportHost(t *testing.T) { } } } + +func TestPublishEventViaAPI_NonBlockingWhenChannelFull(t *testing.T) { + // Create a channel with buffer size 1 and fill it + eventOutCh := make(chan *channel.DataChan, 1) + eventOutCh <- &channel.DataChan{} // fill the buffer + + pubSubAPI := v1pubsub.GetAPIInstance("/tmp/test-store") + pub, _ := pubSubAPI.CreatePublisher(v1pubsub.NewPubSub( + types.ParseURI("http://localhost/dummy"), + "/test/resource", + )) + + scConfig := &common.SCConfiguration{ + EventOutCh: eventOutCh, + PubSubAPI: pubSubAPI, + } + + // Create event with matching publisher ID + event := ceevent.Event{ID: pub.ID} + + // PublishEventViaAPI should return after the 5s timeout (not block forever) + // even though the channel is full + done := make(chan struct{}) + go func() { + _ = common.PublishEventViaAPI(scConfig, event, "/test/resource") + close(done) + }() + + select { + case <-done: + // success — returned after timeout, did not block forever + case <-time.After(10 * time.Second): + t.Fatal("PublishEventViaAPI blocked on full EventOutCh — should return after 5s timeout") + } + + // Channel should still have exactly 1 item (the original, not the new one) + assert.Equal(t, 1, len(eventOutCh)) +} diff --git a/plugins/ptp_operator/metrics/manager.go b/plugins/ptp_operator/metrics/manager.go index a19dca3e..f5487912 100644 --- a/plugins/ptp_operator/metrics/manager.go +++ b/plugins/ptp_operator/metrics/manager.go @@ -783,7 +783,8 @@ func (p *PTPEventManager) SetInitalMetrics() { } func (p *PTPEventManager) TriggerLogs() error { - resp, err := http.Get(logsEndpoint) + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(logsEndpoint) if err != nil { return err } diff --git a/plugins/ptp_operator/ptp_operator_plugin.go b/plugins/ptp_operator/ptp_operator_plugin.go index d3e2c8cd..fd2379d1 100644 --- a/plugins/ptp_operator/ptp_operator_plugin.go +++ b/plugins/ptp_operator/ptp_operator_plugin.go @@ -466,12 +466,16 @@ func listenToSocket(wg *sync.WaitGroup) { } func processMessages(c net.Conn) { - // A new socket connection means the daemon (re)connected. - // Request a full state re-emit so metrics are populated after restart. + // Request a full state re-emit in a separate goroutine so the scanner + // can start reading immediately. TriggerLogs writes emit data back through + // this same socket connection; if we block here waiting for the HTTP response, + // nobody reads the socket, the kernel buffer fills, and the emit handler blocks. if eventManager != nil { - if err := eventManager.TriggerLogs(); err != nil { - log.Warnf("failed to trigger logs on new connection: %v", err) - } + go func() { + if err := eventManager.TriggerLogs(); err != nil { + log.Warnf("failed to trigger logs on new connection: %v", err) + } + }() } scanner := bufio.NewScanner(c) for {