Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ func ProcessInChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
out.Status = channel.SUCCESS
}
}
scConfig.EventOutCh <- &out
if !common.SendToChannel(scConfig.EventOutCh, &out) {
log.Warningf("EventOutCh full, dropping ack for %s", d.Address)
}
} else if d.Type == channel.STATUS && d.Status == channel.NEW {
log.Warnf("event disabled,no action taken(can't send to a destination): logging new status check %v\n", d)
out := channel.DataChan{
Expand Down
29 changes: 23 additions & 6 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ import (
log "github.com/sirupsen/logrus"
)

const sendTimeout = 5 * time.Second

// SendToChannel sends data to a channel with a timeout.
// Returns true if the send succeeded, false if the channel was full for the
// entire timeout duration.
func SendToChannel(ch chan *channel.DataChan, d *channel.DataChan) bool {
select {
case ch <- d:
return true
case <-time.After(sendTimeout):
return false
}
}

// TransportType defines transport type supported
type TransportType int

Expand Down Expand Up @@ -309,18 +323,21 @@ func PublishEvent(scConfig *SCConfiguration, e ceevent.Event) error {
// PublishEventViaAPI ... publish events by not using http request but direct api
func PublishEventViaAPI(scConfig *SCConfiguration, cneEvent ceevent.Event, resourceAddress string) error {
if ceEvent, err := GetPublishingCloudEvent(scConfig, cneEvent); err == nil {
scConfig.EventOutCh <- &channel.DataChan{
d := &channel.DataChan{
Type: channel.EVENT,
Status: channel.NEW,
Data: ceEvent,
Address: resourceAddress, // this is the publishing address
ClientID: scConfig.ClientID(),
}

log.Debugf("event source %s sent to queue to process", ceEvent.Source())
log.Debugf("event sent %s", cneEvent.JSONString())

localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1)
if SendToChannel(scConfig.EventOutCh, d) {
log.Debugf("event source %s sent to queue to process", ceEvent.Source())
log.Debugf("event sent %s", cneEvent.JSONString())
localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1)
} else {
log.Warningf("EventOutCh full for %s, dropping event for %s", sendTimeout, resourceAddress)
localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.FAIL, 1)
}
}
return nil
}
Expand Down
43 changes: 42 additions & 1 deletion pkg/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package common_test
import (
"net"
"testing"
"time"

"github.com/redhat-cne/sdk-go/pkg/channel"
ceevent "github.com/redhat-cne/sdk-go/pkg/event"
"github.com/redhat-cne/sdk-go/pkg/types"

v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub"
log "github.com/sirupsen/logrus"

"github.com/redhat-cne/cloud-event-proxy/pkg/common"
Expand Down Expand Up @@ -61,3 +64,41 @@ func TestTransportHost_ParseTransportHost(t *testing.T) {
}
}
}

func TestPublishEventViaAPI_NonBlockingWhenChannelFull(t *testing.T) {
// Create a channel with buffer size 1 and fill it
eventOutCh := make(chan *channel.DataChan, 1)
eventOutCh <- &channel.DataChan{} // fill the buffer

pubSubAPI := v1pubsub.GetAPIInstance("/tmp/test-store")
pub, _ := pubSubAPI.CreatePublisher(v1pubsub.NewPubSub(
types.ParseURI("http://localhost/dummy"),
"/test/resource",
))

scConfig := &common.SCConfiguration{
EventOutCh: eventOutCh,
PubSubAPI: pubSubAPI,
}

// Create event with matching publisher ID
event := ceevent.Event{ID: pub.ID}

// PublishEventViaAPI should return after the 5s timeout (not block forever)
// even though the channel is full
done := make(chan struct{})
go func() {
_ = common.PublishEventViaAPI(scConfig, event, "/test/resource")
close(done)
}()

select {
case <-done:
// success — returned after timeout, did not block forever
case <-time.After(10 * time.Second):
t.Fatal("PublishEventViaAPI blocked on full EventOutCh — should return after 5s timeout")
}

// Channel should still have exactly 1 item (the original, not the new one)
assert.Equal(t, 1, len(eventOutCh))
}
3 changes: 2 additions & 1 deletion plugins/ptp_operator/metrics/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,8 @@ func (p *PTPEventManager) SetInitalMetrics() {
}

func (p *PTPEventManager) TriggerLogs() error {
resp, err := http.Get(logsEndpoint)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(logsEndpoint)
if err != nil {
return err
}
Expand Down
14 changes: 9 additions & 5 deletions plugins/ptp_operator/ptp_operator_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,16 @@ func listenToSocket(wg *sync.WaitGroup) {
}

func processMessages(c net.Conn) {
// A new socket connection means the daemon (re)connected.
// Request a full state re-emit so metrics are populated after restart.
// Request a full state re-emit in a separate goroutine so the scanner
// can start reading immediately. TriggerLogs writes emit data back through
// this same socket connection; if we block here waiting for the HTTP response,
// nobody reads the socket, the kernel buffer fills, and the emit handler blocks.
if eventManager != nil {
if err := eventManager.TriggerLogs(); err != nil {
log.Warnf("failed to trigger logs on new connection: %v", err)
}
go func() {
if err := eventManager.TriggerLogs(); err != nil {
log.Warnf("failed to trigger logs on new connection: %v", err)
}
}()
}
scanner := bufio.NewScanner(c)
for {
Expand Down
Loading