diff --git a/cmd/main.go b/cmd/main.go index 9663e6fe..00f411a8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -417,7 +417,9 @@ func ProcessInChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) { out.Status = channel.SUCCESS } } - scConfig.EventOutCh <- &out + if !common.SendToChannel(scConfig.EventOutCh, &out) { + log.Warningf("EventOutCh full, dropping ack for %s", d.Address) + } } else if d.Type == channel.STATUS && d.Status == channel.NEW { log.Warnf("event disabled,no action taken(can't send to a destination): logging new status check %v\n", d) out := channel.DataChan{ diff --git a/pkg/common/common.go b/pkg/common/common.go index ffd73c73..cb3adc30 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -45,6 +45,20 @@ import ( log "github.com/sirupsen/logrus" ) +const sendTimeout = 5 * time.Second + +// SendToChannel sends data to a channel with a timeout. +// Returns true if the send succeeded, false if the channel was full for the +// entire timeout duration. +func SendToChannel(ch chan *channel.DataChan, d *channel.DataChan) bool { + select { + case ch <- d: + return true + case <-time.After(sendTimeout): + return false + } +} + // TransportType defines transport type supported type TransportType int @@ -309,18 +323,21 @@ func PublishEvent(scConfig *SCConfiguration, e ceevent.Event) error { // PublishEventViaAPI ... publish events by not using http request but direct api func PublishEventViaAPI(scConfig *SCConfiguration, cneEvent ceevent.Event, resourceAddress string) error { if ceEvent, err := GetPublishingCloudEvent(scConfig, cneEvent); err == nil { - scConfig.EventOutCh <- &channel.DataChan{ + d := &channel.DataChan{ Type: channel.EVENT, Status: channel.NEW, Data: ceEvent, Address: resourceAddress, // this is the publishing address ClientID: scConfig.ClientID(), } - - log.Debugf("event source %s sent to queue to process", ceEvent.Source()) - log.Debugf("event sent %s", cneEvent.JSONString()) - - localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1) + if SendToChannel(scConfig.EventOutCh, d) { + log.Debugf("event source %s sent to queue to process", ceEvent.Source()) + log.Debugf("event sent %s", cneEvent.JSONString()) + localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1) + } else { + log.Warningf("EventOutCh full for %s, dropping event for %s", sendTimeout, resourceAddress) + localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.FAIL, 1) + } } return nil } diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 2e5785bf..b4bc649b 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -17,9 +17,12 @@ package common_test import ( "net" "testing" + "time" + "github.com/redhat-cne/sdk-go/pkg/channel" + ceevent "github.com/redhat-cne/sdk-go/pkg/event" "github.com/redhat-cne/sdk-go/pkg/types" - + v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub" log "github.com/sirupsen/logrus" "github.com/redhat-cne/cloud-event-proxy/pkg/common" @@ -61,3 +64,41 @@ func TestTransportHost_ParseTransportHost(t *testing.T) { } } } + +func TestPublishEventViaAPI_NonBlockingWhenChannelFull(t *testing.T) { + // Create a channel with buffer size 1 and fill it + eventOutCh := make(chan *channel.DataChan, 1) + eventOutCh <- &channel.DataChan{} // fill the buffer + + pubSubAPI := v1pubsub.GetAPIInstance("/tmp/test-store") + pub, _ := pubSubAPI.CreatePublisher(v1pubsub.NewPubSub( + types.ParseURI("http://localhost/dummy"), + "/test/resource", + )) + + scConfig := &common.SCConfiguration{ + EventOutCh: eventOutCh, + PubSubAPI: pubSubAPI, + } + + // Create event with matching publisher ID + event := ceevent.Event{ID: pub.ID} + + // PublishEventViaAPI should return after the 5s timeout (not block forever) + // even though the channel is full + done := make(chan struct{}) + go func() { + _ = common.PublishEventViaAPI(scConfig, event, "/test/resource") + close(done) + }() + + select { + case <-done: + // success — returned after timeout, did not block forever + case <-time.After(10 * time.Second): + t.Fatal("PublishEventViaAPI blocked on full EventOutCh — should return after 5s timeout") + } + + // Channel should still have exactly 1 item (the original, not the new one) + assert.Equal(t, 1, len(eventOutCh)) +} diff --git a/plugins/ptp_operator/metrics/manager.go b/plugins/ptp_operator/metrics/manager.go index 09c88996..b900b882 100644 --- a/plugins/ptp_operator/metrics/manager.go +++ b/plugins/ptp_operator/metrics/manager.go @@ -776,7 +776,8 @@ func (p *PTPEventManager) SetInitalMetrics() { } func (p *PTPEventManager) TriggerLogs() error { - resp, err := http.Get(logsEndpoint) + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(logsEndpoint) if err != nil { return err } diff --git a/plugins/ptp_operator/ptp_operator_plugin.go b/plugins/ptp_operator/ptp_operator_plugin.go index 4d5a58f6..df8f9c08 100644 --- a/plugins/ptp_operator/ptp_operator_plugin.go +++ b/plugins/ptp_operator/ptp_operator_plugin.go @@ -665,12 +665,16 @@ func listenToSocket(wg *sync.WaitGroup) { } func processMessages(c net.Conn) { - // A new socket connection means the daemon (re)connected. - // Request a full state re-emit so metrics are populated after restart. + // Request a full state re-emit in a separate goroutine so the scanner + // can start reading immediately. TriggerLogs writes emit data back through + // this same socket connection; if we block here waiting for the HTTP response, + // nobody reads the socket, the kernel buffer fills, and the emit handler blocks. if eventManager != nil { - if err := eventManager.TriggerLogs(); err != nil { - log.Warnf("failed to trigger logs on new connection: %v", err) - } + go func() { + if err := eventManager.TriggerLogs(); err != nil { + log.Warnf("failed to trigger logs on new connection: %v", err) + } + }() } scanner := bufio.NewScanner(c) for {