diff --git a/common/broker/fsqueue/adapter.go b/common/broker/fsqueue/adapter.go new file mode 100644 index 0000000000..85a5f01474 --- /dev/null +++ b/common/broker/fsqueue/adapter.go @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2024. Abstrium SAS + * This file is part of Pydio Cells. + * + * Pydio Cells is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Pydio Cells is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Pydio Cells. If not, see . + * + * The latest code can be found at . + */ + +// Package filepubsub provides an AsyncQueue adapter that wraps the gocloud pubsub +// file driver, enabling sync endpoints to use filesystem-backed pubsub queues. +// +// # URLs +// +// The queue registers the "fpub" scheme. The URL format is: +// +// fpub:///path/to/spool?name=streamname&ackdeadline=1m +// +// Query parameters: +// - name: Required stream name (used for directory naming) +// - ackdeadline: Optional ack deadline (defaults to 1m) +// +// Example: +// +// fpub:///shared/broker?name=syncro +package filepubsub + +import ( + "context" + "errors" + "net/url" + "path/filepath" + "sync" + "time" + + "go.uber.org/zap" + "gocloud.dev/pubsub" + "google.golang.org/protobuf/proto" + + "github.com/pydio/cells/v5/common/broker" + "github.com/pydio/cells/v5/common/telemetry/log" +) + +var ( + errMissingStreamName = errors.New("fpub: please provide a stream name via ?name=") + errQueueClosed = errors.New("fpub: queue is closed") +) + +func init() { + broker.RegisterAsyncQueue("fpub", &fpubQueue{}) +} + +// fpubQueue implements broker.AsyncQueue using the filepubsub gocloud driver. +type fpubQueue struct { + ctx context.Context + cancel context.CancelFunc + topic *pubsub.Topic + sub *pubsub.Subscription + basePath string + name string + + closeMu sync.Mutex + closed bool + closeErr error +} + +// OpenURL implements broker.AsyncQueueOpener. +func (f *fpubQueue) OpenURL(ctx context.Context, u *url.URL) (broker.AsyncQueue, error) { + streamName := u.Query().Get("name") + if streamName == "" { + return nil, errMissingStreamName + } + + // Parse ack deadline + ackDeadline := time.Minute + if ad := u.Query().Get("ackdeadline"); ad != "" { + if d, err := time.ParseDuration(ad); err == nil { + ackDeadline = d + } + } + + // Build base path: /path/from/url/fpub-streamname + basePath := filepath.Join(u.Path, "fpub-"+streamName) + + // Create topic + topic, err := NewTopic(basePath) + if err != nil { + return nil, err + } + + // Create subscription + sub, err := NewSubscription(topic, ackDeadline) + if err != nil { + _ = topic.Shutdown(ctx) + return nil, err + } + + qCtx, cancel := context.WithCancel(ctx) + return &fpubQueue{ + ctx: qCtx, + cancel: cancel, + topic: topic, + sub: sub, + basePath: basePath, + name: streamName, + }, nil +} + +// Push implements broker.AsyncQueue.Push. +// Encodes the protobuf message with context and sends via pubsub.Topic. +func (f *fpubQueue) Push(ctx context.Context, msg proto.Message) error { + f.closeMu.Lock() + if f.closed { + f.closeMu.Unlock() + return errQueueClosed + } + f.closeMu.Unlock() + + body := broker.EncodeProtoWithContext(ctx, msg) + return f.topic.Send(ctx, &pubsub.Message{ + Body: body, + }) +} + +// PushRaw implements broker.AsyncQueue.PushRaw. +// Sends a pre-encoded broker.Message. +func (f *fpubQueue) PushRaw(ctx context.Context, message broker.Message) error { + f.closeMu.Lock() + if f.closed { + f.closeMu.Unlock() + return errQueueClosed + } + f.closeMu.Unlock() + + body := broker.EncodeBrokerMessage(message) + return f.topic.Send(ctx, &pubsub.Message{ + Body: body, + }) +} + +// Consume implements broker.AsyncQueue.Consume. +// Starts a goroutine that receives messages from the subscription +// and invokes the callback with decoded broker.Messages. +func (f *fpubQueue) Consume(callback func(context.Context, ...broker.Message)) error { + go func() { + for { + select { + case <-f.ctx.Done(): + log.Logger(f.ctx).Debug("[fpubQueue] Consumer stopping: context done", zap.String("name", f.name)) + return + default: + } + + msg, err := f.sub.Receive(f.ctx) + if err != nil { + // Check if we're closing + f.closeMu.Lock() + closing := f.closed + f.closeMu.Unlock() + if closing { + log.Logger(f.ctx).Debug("[fpubQueue] Consumer stopping: queue closed", zap.String("name", f.name)) + return + } + + // Context canceled + if f.ctx.Err() != nil { + log.Logger(f.ctx).Debug("[fpubQueue] Consumer stopping: context error", zap.String("name", f.name), zap.Error(f.ctx.Err())) + return + } + + log.Logger(f.ctx).Error("[fpubQueue] Error receiving message", zap.String("name", f.name), zap.Error(err)) + // Brief pause before retry + select { + case <-time.After(100 * time.Millisecond): + case <-f.ctx.Done(): + return + } + continue + } + + // Decode the broker message + brokerMsg, err := broker.DecodeToBrokerMessage(msg.Body) + if err != nil { + log.Logger(f.ctx).Error("[fpubQueue] Failed to decode message", zap.String("name", f.name), zap.Error(err)) + msg.Ack() + continue + } + + // Invoke callback + callback(f.ctx, brokerMsg) + + // Acknowledge after callback completes + msg.Ack() + } + }() + return nil +} + +// Close implements broker.AsyncQueue.Close. +// Shuts down both subscription and topic. +func (f *fpubQueue) Close(ctx context.Context) error { + f.closeMu.Lock() + defer f.closeMu.Unlock() + + if f.closed { + return f.closeErr + } + f.closed = true + + // Cancel consumer context + if f.cancel != nil { + f.cancel() + } + + var errs []error + + // Shutdown subscription first + if f.sub != nil { + if err := f.sub.Shutdown(ctx); err != nil { + errs = append(errs, err) + } + } + + // Then shutdown topic + if f.topic != nil { + if err := f.topic.Shutdown(ctx); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + f.closeErr = errs[0] + } + return f.closeErr +} diff --git a/common/broker/fsqueue/fspubsub.go b/common/broker/fsqueue/fspubsub.go new file mode 100644 index 0000000000..1c316e452f --- /dev/null +++ b/common/broker/fsqueue/fspubsub.go @@ -0,0 +1,746 @@ +/* + * Copyright (c) 2024. Abstrium SAS + * This file is part of Pydio Cells. + * + * Pydio Cells is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Pydio Cells is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Pydio Cells. If not, see . + * + * The latest code can be found at . + */ + +// Package filepubsub provides a filesystem-backed pubsub implementation using gocloud.dev/pubsub. +// It writes messages as .pb files to a directory and uses filesystem notifications (notify) +// for efficient subscription instead of polling. +// +// Use NewTopic to construct a *pubsub.Topic, and NewSubscription to construct a *pubsub.Subscription. +// +// filepubsub provides at-least-once delivery semantics. Messages that are not acknowledged +// within the ack deadline will be redelivered. +// +// # URLs +// +// For pubsub.OpenTopic and pubsub.OpenSubscription, filepubsub registers +// for the scheme "file". +// The host+path is used as the directory path for message storage. +// Query parameters: +// - ackdeadline: The ack deadline for OpenSubscription, in time.ParseDuration formats. +// Defaults to 1m. +// +// Examples: +// - file:///shared/broker/mytopic +// - file:///var/spool/pubsub/events?ackdeadline=30s +// +// # As +// +// filepubsub does not support any types for As. +package filepubsub + +import ( + "bytes" + "context" + "encoding/gob" + "errors" + "fmt" + "net/url" + "os" + "path" + "path/filepath" + "sort" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/rjeczalik/notify" + "gocloud.dev/gcerrors" + "gocloud.dev/pubsub" + "gocloud.dev/pubsub/batcher" + "gocloud.dev/pubsub/driver" +) + +func init() { + o := new(URLOpener) + pubsub.DefaultURLMux().RegisterTopic(Scheme, o) + pubsub.DefaultURLMux().RegisterSubscription(Scheme, o) +} + +// Scheme is the URL scheme filepubsub registers its URLOpeners under on pubsub.DefaultMux. +const Scheme = "file" + +// Subdirectories for message states +const ( + pendingDir = "pending" + processingDir = "processing" + tmpDir = ".tmp" +) + +// Sentinel errors for filepubsub package +var ( + ErrTopicNotExist = errors.New("filepubsub: topic does not exist") + ErrTopicClosed = errors.New("filepubsub: topic is closed") + ErrSubscriptionClosed = errors.New("filepubsub: subscription is closed") + ErrInvalidPath = errors.New("filepubsub: path is required") + ErrInvalidParam = errors.New("filepubsub: invalid query parameter") + ErrInvalidAckDeadline = errors.New("filepubsub: invalid ackdeadline") + ErrCreateDir = errors.New("filepubsub: failed to create directory") + ErrWriteFile = errors.New("filepubsub: failed to write file") + ErrMoveFile = errors.New("filepubsub: failed to move file") +) + +// recvBatcherOpts configures batching for receives. +// Like NATS, we use MaxBatchSize=1 and MaxHandlers=1 for strict ordering. +var recvBatcherOpts = &batcher.Options{ + MaxBatchSize: 1, + MaxHandlers: 1, +} + +// URLOpener opens filepubsub URLs like "file:///path/to/topic". +// +// The URL's host+path is used as the directory for message storage. +// +// Query parameters: +// - ackdeadline: The ack deadline for OpenSubscription, in time.ParseDuration formats. +// Defaults to 1m. +type URLOpener struct { + mu sync.Mutex + topics map[string]*pubsub.Topic +} + +// OpenTopicURL opens a pubsub.Topic based on u. +func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + for param := range u.Query() { + if param != "ackdeadline" { + return nil, fmt.Errorf("%w: %q in %v", ErrInvalidParam, param, u) + } + } + topicPath := path.Join(u.Host, u.Path) + if topicPath == "" { + return nil, fmt.Errorf("%w: %v", ErrInvalidPath, u) + } + + o.mu.Lock() + defer o.mu.Unlock() + if o.topics == nil { + o.topics = map[string]*pubsub.Topic{} + } + t := o.topics[topicPath] + if t == nil { + var err error + t, err = NewTopic(topicPath) + if err != nil { + return nil, err + } + o.topics[topicPath] = t + } + return t, nil +} + +// OpenSubscriptionURL opens a pubsub.Subscription based on u. +func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + q := u.Query() + ackDeadline := 1 * time.Minute + if s := q.Get("ackdeadline"); s != "" { + var err error + ackDeadline, err = time.ParseDuration(s) + if err != nil { + return nil, fmt.Errorf("%w %q: %v", ErrInvalidAckDeadline, s, err) + } + q.Del("ackdeadline") + } + for param := range q { + return nil, fmt.Errorf("%w: %q in %v", ErrInvalidParam, param, u) + } + topicPath := path.Join(u.Host, u.Path) + if topicPath == "" { + return nil, fmt.Errorf("%w: %v", ErrInvalidPath, u) + } + + o.mu.Lock() + defer o.mu.Unlock() + if o.topics == nil { + o.topics = map[string]*pubsub.Topic{} + } + t := o.topics[topicPath] + if t == nil { + // Create topic if it doesn't exist + var err error + t, err = NewTopic(topicPath) + if err != nil { + return nil, err + } + o.topics[topicPath] = t + } + return NewSubscription(t, ackDeadline) +} + +// TopicOptions contains configuration options for topics. +type TopicOptions struct { + // BatcherOptions adds constraints to the default batching done for sends. + BatcherOptions batcher.Options +} + +type topic struct { + mu sync.Mutex + basePath string + subs []*subscription + nextAckID int + closed bool +} + +// NewTopic creates a new filesystem-backed topic. +func NewTopic(basePath string) (*pubsub.Topic, error) { + return NewTopicWithOptions(basePath, nil) +} + +// NewTopicWithOptions is similar to NewTopic, but supports TopicOptions. +func NewTopicWithOptions(basePath string, opts *TopicOptions) (*pubsub.Topic, error) { + if opts == nil { + opts = &TopicOptions{} + } + + // Create directories + for _, dir := range []string{pendingDir, processingDir, tmpDir} { + dirPath := filepath.Join(basePath, dir) + if err := os.MkdirAll(dirPath, 0o755); err != nil { + return nil, fmt.Errorf("filepubsub: failed to create directory %s: %w", dirPath, err) + } + } + + t := &topic{ + basePath: basePath, + } + return pubsub.NewTopic(t, &opts.BatcherOptions), nil +} + +// SendBatch implements driver.Topic.SendBatch. +func (t *topic) SendBatch(ctx context.Context, ms []*driver.Message) error { + if err := ctx.Err(); err != nil { + return err + } + if t == nil { + return ErrTopicNotExist + } + t.mu.Lock() + defer t.mu.Unlock() + if t.closed { + return ErrTopicClosed + } + + tmpPath := filepath.Join(t.basePath, tmpDir) + pendPath := filepath.Join(t.basePath, pendingDir) + + // Pre-allocate slices to reduce memory allocations + var filenames []string + filenames = make([]string, 0, len(ms)) + + for i, m := range ms { + m.AckID = t.nextAckID + i + m.LoggableID = fmt.Sprintf("msg #%d", m.AckID) + m.AsFunc = func(any) bool { return false } + + if m.BeforeSend != nil { + if err := m.BeforeSend(func(any) bool { return false }); err != nil { + return err + } + } + + // Encode message + payload, err := encodeMessage(m) + if err != nil { + return err + } + + // Generate unique filename with timestamp for ordering + filename := fmt.Sprintf("%020d-%08d-%s.pb", time.Now().UnixNano(), m.AckID, uuid.New().String()[:8]) + filenames = append(filenames, filename) + + // Write atomically: tmp -> pending + tmpFile := filepath.Join(tmpPath, filename) + if err := os.WriteFile(tmpFile, payload, 0o644); err != nil { + return fmt.Errorf("filepubsub: failed to write temp file: %w", err) + } + } + + // Move all files to pending directory in a single loop + for _, filename := range filenames { + tmpFile := filepath.Join(tmpPath, filename) + finalFile := filepath.Join(pendPath, filename) + if err := os.Rename(tmpFile, finalFile); err != nil { + _ = os.Remove(tmpFile) + return fmt.Errorf("filepubsub: failed to move file to pending: %w", err) + } + } + + t.nextAckID += len(ms) + + // Notify all subscriptions + for _, s := range t.subs { + s.notifyNewMessages() + } + return nil +} + +// IsRetryable implements driver.Topic.IsRetryable. +func (*topic) IsRetryable(error) bool { return false } + +// As implements driver.Topic.As. +func (t *topic) As(i any) bool { + p, ok := i.(**topic) + if !ok { + return false + } + *p = t + return true +} + +// ErrorAs implements driver.Topic.ErrorAs +func (*topic) ErrorAs(error, any) bool { + return false +} + +// ErrorCode implements driver.Topic.ErrorCode +func (*topic) ErrorCode(err error) gcerrors.ErrorCode { + switch { + case errors.Is(err, ErrTopicNotExist), errors.Is(err, ErrTopicClosed), errors.Is(err, ErrSubscriptionClosed): + return gcerrors.NotFound + case errors.Is(err, ErrInvalidPath), errors.Is(err, ErrInvalidParam), errors.Is(err, ErrInvalidAckDeadline): + return gcerrors.InvalidArgument + case errors.Is(err, context.Canceled): + return gcerrors.Canceled + case errors.Is(err, context.DeadlineExceeded): + return gcerrors.DeadlineExceeded + default: + return gcerrors.Unknown + } +} + +// Close implements driver.Topic.Close. +func (t *topic) Close() error { + t.mu.Lock() + defer t.mu.Unlock() + t.closed = true + return nil +} + +// SubscriptionOptions sets options for constructing a *pubsub.Subscription +// backed by filepubsub. +type SubscriptionOptions struct { + // ReceiveBatcherOptions adds constraints to the default batching done for receives. + ReceiveBatcherOptions batcher.Options + // AckBatcherOptions adds constraints to the default batching done for acks. + AckBatcherOptions batcher.Options +} + +type subscription struct { + mu sync.Mutex + topic *topic + ackDeadline time.Duration + msgs map[driver.AckID]*message // all unacknowledged messages + notifyChan chan notify.EventInfo + newMsgChan chan struct{} // signaled when new messages arrive + stopWatcher chan struct{} + watcherDone chan struct{} + closed bool +} + +type message struct { + msg *driver.Message + filename string + expiration time.Time +} + +// NewSubscription creates a new subscription for the given topic. +// It panics if the given topic did not come from filepubsub. +func NewSubscription(pstopic *pubsub.Topic, ackDeadline time.Duration) (*pubsub.Subscription, error) { + return NewSubscriptionWithOptions(pstopic, ackDeadline, nil) +} + +// NewSubscriptionWithOptions is similar to NewSubscription, but supports SubscriptionOptions. +func NewSubscriptionWithOptions(pstopic *pubsub.Topic, ackDeadline time.Duration, opts *SubscriptionOptions) (*pubsub.Subscription, error) { + if opts == nil { + opts = &SubscriptionOptions{} + } + var t *topic + if !pstopic.As(&t) { + panic("filepubsub: NewSubscription passed a Topic not from filepubsub") + } + + s, err := newSubscription(t, ackDeadline) + if err != nil { + return nil, err + } + return pubsub.NewSubscription(s, recvBatcherOpts, &opts.AckBatcherOptions), nil +} + +func newSubscription(t *topic, ackDeadline time.Duration) (*subscription, error) { + s := &subscription{ + topic: t, + ackDeadline: ackDeadline, + msgs: map[driver.AckID]*message{}, + notifyChan: make(chan notify.EventInfo, 100), + newMsgChan: make(chan struct{}, 1), + stopWatcher: make(chan struct{}), + watcherDone: make(chan struct{}), + } + + if t != nil { + t.mu.Lock() + t.subs = append(t.subs, s) + t.mu.Unlock() + + // Recover any messages stuck in processing + if err := s.recoverProcessing(); err != nil { + return nil, err + } + + // Start filesystem watcher + if err := s.startWatcher(); err != nil { + return nil, err + } + } + return s, nil +} + +// startWatcher sets up filesystem notifications on the pending directory +func (s *subscription) startWatcher() error { + pendPath := filepath.Join(s.topic.basePath, pendingDir) + + // Watch for new files in pending directory + if err := notify.Watch(pendPath, s.notifyChan, notify.Create, notify.Rename); err != nil { + return fmt.Errorf("filepubsub: failed to watch directory: %w", err) + } + + go func() { + defer close(s.watcherDone) + for { + select { + case <-s.stopWatcher: + notify.Stop(s.notifyChan) + return + case ev := <-s.notifyChan: + // Only care about .pb files + if strings.HasSuffix(ev.Path(), ".pb") { + s.notifyNewMessages() + } + } + } + }() + + return nil +} + +// notifyNewMessages signals that new messages are available +func (s *subscription) notifyNewMessages() { + select { + case s.newMsgChan <- struct{}{}: + default: + // Channel already has a signal + } +} + +// recoverProcessing moves stuck messages from processing back to pending +func (s *subscription) recoverProcessing() error { + procPath := filepath.Join(s.topic.basePath, processingDir) + pendPath := filepath.Join(s.topic.basePath, pendingDir) + + entries, err := os.ReadDir(procPath) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + // Pre-allocate slice for filenames to reduce memory allocations + filenames := make([]string, 0, len(entries)) + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { + continue + } + filenames = append(filenames, entry.Name()) + } + + // Move all files in a single loop + for _, filename := range filenames { + src := filepath.Join(procPath, filename) + dst := filepath.Join(pendPath, filename) + _ = os.Rename(src, dst) + } + return nil +} + +// receiveNoWait collects messages available for delivery +func (s *subscription) receiveNoWait(now time.Time, max int) ([]*driver.Message, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // First, check for expired messages that need redelivery + for _, m := range s.msgs { + if now.After(m.expiration) { + // Reset expiration for redelivery + m.expiration = now.Add(s.ackDeadline) + return []*driver.Message{m.msg}, nil + } + } + + // Read new messages from pending directory + pendPath := filepath.Join(s.topic.basePath, pendingDir) + procPath := filepath.Join(s.topic.basePath, processingDir) + + entries, err := os.ReadDir(pendPath) + if err != nil { + return nil, err + } + + // Pre-allocate slice to reduce memory allocations + var msgs []*driver.Message + msgs = make([]*driver.Message, 0, max) + + // Sort by filename (timestamp-based ordering) + sort.Slice(entries, func(i, j int) bool { + return entries[i].Name() < entries[j].Name() + }) + + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { + continue + } + + filename := entry.Name() + src := filepath.Join(pendPath, filename) + dst := filepath.Join(procPath, filename) + + // Try to claim the message via atomic rename + if err := os.Rename(src, dst); err != nil { + // Another consumer claimed it + continue + } + + // Read the message + data, err := os.ReadFile(dst) + if err != nil { + // Move back to pending for retry + _ = os.Rename(dst, src) + continue + } + + dm, err := decodeMessage(data) + if err != nil { + // Corrupt message, remove it + _ = os.Remove(dst) + continue + } + + // Track the message for ack/nack + m := &message{ + msg: dm, + filename: filename, + expiration: now.Add(s.ackDeadline), + } + s.msgs[dm.AckID] = m + + msgs = append(msgs, dm) + if len(msgs) >= max { + break + } + } + + return msgs, nil +} + +// ReceiveBatch implements driver.ReceiveBatch. +func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) { + if s == nil || s.topic == nil { + return nil, ErrTopicNotExist + } + + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return nil, ErrSubscriptionClosed + } + s.mu.Unlock() + + // Check for messages + msgs, err := s.receiveNoWait(time.Now(), maxMessages) + if err != nil { + return nil, err + } + if len(msgs) > 0 { + return msgs, nil + } + + // Wait for new messages or timeout + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-s.newMsgChan: + // New messages available, try again + return s.receiveNoWait(time.Now(), maxMessages) + case <-time.After(100 * time.Millisecond): + // Periodic check for redelivery + return nil, nil + } +} + +// SendAcks implements driver.SendAcks. +func (s *subscription) SendAcks(ctx context.Context, ackIDs []driver.AckID) error { + if s.topic == nil { + return ErrTopicNotExist + } + if err := ctx.Err(); err != nil { + return err + } + + procPath := filepath.Join(s.topic.basePath, processingDir) + + s.mu.Lock() + defer s.mu.Unlock() + + // Pre-allocate slice to reduce memory allocations + toDelete := make([]driver.AckID, 0, len(ackIDs)) + for _, id := range ackIDs { + if m, ok := s.msgs[id]; ok { + // Delete the file (acknowledge) + filePath := filepath.Join(procPath, m.filename) + _ = os.Remove(filePath) + toDelete = append(toDelete, id) + } + } + + // Delete all messages at once to reduce map operations + for _, id := range toDelete { + delete(s.msgs, id) + } + return nil +} + +// CanNack implements driver.CanNack. +func (s *subscription) CanNack() bool { return true } + +// SendNacks implements driver.SendNacks. +func (s *subscription) SendNacks(ctx context.Context, ackIDs []driver.AckID) error { + if s.topic == nil { + return ErrTopicNotExist + } + if err := ctx.Err(); err != nil { + return err + } + + procPath := filepath.Join(s.topic.basePath, processingDir) + pendPath := filepath.Join(s.topic.basePath, pendingDir) + + s.mu.Lock() + defer s.mu.Unlock() + + // Pre-allocate slice to reduce memory allocations + toProcess := make([]driver.AckID, 0, len(ackIDs)) + for _, id := range ackIDs { + if m, ok := s.msgs[id]; ok { + // Move file back to pending for redelivery + src := filepath.Join(procPath, m.filename) + dst := filepath.Join(pendPath, m.filename) + _ = os.Rename(src, dst) + toProcess = append(toProcess, id) + } + } + + // Delete all messages at once to reduce map operations + for _, id := range toProcess { + delete(s.msgs, id) + } + return nil +} + +// IsRetryable implements driver.Subscription.IsRetryable. +func (*subscription) IsRetryable(error) bool { return false } + +// As implements driver.Subscription.As. +func (s *subscription) As(i any) bool { return false } + +// ErrorAs implements driver.Subscription.ErrorAs +func (*subscription) ErrorAs(error, any) bool { + return false +} + +// Close implements driver.Subscription.Close. +func (s *subscription) Close() error { + s.mu.Lock() + s.closed = true + s.mu.Unlock() + + // Stop the watcher + close(s.stopWatcher) + <-s.watcherDone + + return nil +} + +// ErrorCode implements driver.Subscription.ErrorCode. +func (*subscription) ErrorCode(err error) gcerrors.ErrorCode { + switch { + case errors.Is(err, ErrTopicNotExist), errors.Is(err, ErrTopicClosed), errors.Is(err, ErrSubscriptionClosed): + return gcerrors.NotFound + case errors.Is(err, ErrInvalidPath), errors.Is(err, ErrInvalidParam), errors.Is(err, ErrInvalidAckDeadline): + return gcerrors.InvalidArgument + case errors.Is(err, context.Canceled): + return gcerrors.Canceled + case errors.Is(err, context.DeadlineExceeded): + return gcerrors.DeadlineExceeded + default: + return gcerrors.Unknown + } +} + +// wireMessage is a serializable representation of driver.Message +type wireMessage struct { + Metadata map[string]string + Body []byte + AckID int // concrete type instead of driver.AckID (any) +} + +// encodeMessage encodes a driver.Message to bytes using gob. +func encodeMessage(dm *driver.Message) ([]byte, error) { + wm := wireMessage{ + Metadata: dm.Metadata, + Body: dm.Body, + AckID: dm.AckID.(int), // we always set AckID as int in SendBatch + } + var buf bytes.Buffer + // Pre-allocate buffer to reduce memory allocations + buf.Grow(1024) // Start with 1KB buffer + if err := gob.NewEncoder(&buf).Encode(wm); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// decodeMessage decodes bytes to a driver.Message using gob. +func decodeMessage(data []byte) (*driver.Message, error) { + var wm wireMessage + if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(&wm); err != nil { + return nil, err + } + return &driver.Message{ + Metadata: wm.Metadata, + Body: wm.Body, + AckID: wm.AckID, + AsFunc: func(any) bool { return false }, + }, nil +} diff --git a/common/broker/fsqueue/fspubsub_test.go b/common/broker/fsqueue/fspubsub_test.go new file mode 100644 index 0000000000..50c4fb250e --- /dev/null +++ b/common/broker/fsqueue/fspubsub_test.go @@ -0,0 +1,331 @@ +/* + * Copyright (c) 2024. Abstrium SAS + * This file is part of Pydio Cells. + * + * Pydio Cells is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Pydio Cells is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Pydio Cells. If not, see . + * + * The latest code can be found at . + */ + +package filepubsub + +import ( + "context" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + "gocloud.dev/pubsub" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/pydio/cells/v5/common/broker" + . "github.com/smartystreets/goconvey/convey" +) + +func TestFilePubSub(t *testing.T) { + Convey("Test FilePubSub Topic and Subscription", t, func() { + // Create temp directory + testDir, err := os.MkdirTemp("", "filepubsub-test-*") + So(err, ShouldBeNil) + defer os.RemoveAll(testDir) + + topicPath := filepath.Join(testDir, "test-topic") + + Convey("Create Topic", func() { + topic, err := NewTopic(topicPath) + So(err, ShouldBeNil) + So(topic, ShouldNotBeNil) + defer topic.Shutdown(context.Background()) + + // Verify directories created + _, err = os.Stat(filepath.Join(topicPath, pendingDir)) + So(err, ShouldBeNil) + _, err = os.Stat(filepath.Join(topicPath, processingDir)) + So(err, ShouldBeNil) + _, err = os.Stat(filepath.Join(topicPath, tmpDir)) + So(err, ShouldBeNil) + }) + + Convey("Send and Receive Message", func() { + topic, err := NewTopic(topicPath) + So(err, ShouldBeNil) + defer topic.Shutdown(context.Background()) + + sub, err := NewSubscription(topic, 1*time.Minute) + So(err, ShouldBeNil) + defer sub.Shutdown(context.Background()) + + ctx := context.Background() + + // Send message + err = topic.Send(ctx, &pubsub.Message{ + Body: []byte("hello world"), + Metadata: map[string]string{"key": "value"}, + }) + So(err, ShouldBeNil) + + // Receive message + msg, err := sub.Receive(ctx) + So(err, ShouldBeNil) + So(msg, ShouldNotBeNil) + So(string(msg.Body), ShouldEqual, "hello world") + So(msg.Metadata["key"], ShouldEqual, "value") + + // Ack the message + msg.Ack() + + // Verify file is deleted + time.Sleep(100 * time.Millisecond) + entries, _ := os.ReadDir(filepath.Join(topicPath, processingDir)) + So(len(entries), ShouldEqual, 0) + }) + + Convey("Nack and Redeliver Message", func() { + topic, err := NewTopic(topicPath) + So(err, ShouldBeNil) + defer topic.Shutdown(context.Background()) + + sub, err := NewSubscription(topic, 1*time.Minute) + So(err, ShouldBeNil) + defer sub.Shutdown(context.Background()) + + ctx := context.Background() + + // Send message + err = topic.Send(ctx, &pubsub.Message{ + Body: []byte("nack test"), + }) + So(err, ShouldBeNil) + + // Receive and nack + msg, err := sub.Receive(ctx) + So(err, ShouldBeNil) + So(msg.Nackable(), ShouldBeTrue) + msg.Nack() + + // Should be redelivered + time.Sleep(100 * time.Millisecond) + msg2, err := sub.Receive(ctx) + So(err, ShouldBeNil) + So(string(msg2.Body), ShouldEqual, "nack test") + msg2.Ack() + }) + + Convey("Message Ordering", func() { + topic, err := NewTopic(topicPath) + So(err, ShouldBeNil) + defer topic.Shutdown(context.Background()) + + sub, err := NewSubscription(topic, 1*time.Minute) + So(err, ShouldBeNil) + defer sub.Shutdown(context.Background()) + + ctx := context.Background() + + // Send multiple messages + for i := 0; i < 5; i++ { + err = topic.Send(ctx, &pubsub.Message{ + Body: []byte{byte('A' + i)}, + }) + So(err, ShouldBeNil) + time.Sleep(10 * time.Millisecond) // Ensure different timestamps + } + + // Receive in order + for i := 0; i < 5; i++ { + msg, err := sub.Receive(ctx) + So(err, ShouldBeNil) + So(msg.Body[0], ShouldEqual, byte('A'+i)) + msg.Ack() + } + }) + }) +} + +func TestURLOpener(t *testing.T) { + Convey("Test URL Opener", t, func() { + tmpDir, err := os.MkdirTemp("", "filepubsub-url-test-*") + So(err, ShouldBeNil) + defer os.RemoveAll(tmpDir) + + ctx := context.Background() + + Convey("Open Topic via URL", func() { + url := "file://" + filepath.Join(tmpDir, "url-topic") + topic, err := pubsub.OpenTopic(ctx, url) + So(err, ShouldBeNil) + So(topic, ShouldNotBeNil) + defer topic.Shutdown(ctx) + }) + + Convey("Open Subscription via URL", func() { + url := "file://" + filepath.Join(tmpDir, "url-sub") + "?ackdeadline=30s" + sub, err := pubsub.OpenSubscription(ctx, url) + So(err, ShouldBeNil) + So(sub, ShouldNotBeNil) + defer sub.Shutdown(ctx) + }) + }) +} + +func mustParseURL(s string) *url.URL { + u, _ := url.Parse(s) + return u +} + +func TestFpubQueue(t *testing.T) { + Convey("Test fpubQueue AsyncQueue", t, func() { + testDir, err := os.MkdirTemp("", "fpubqueue-test-*") + So(err, ShouldBeNil) + defer os.RemoveAll(testDir) + + ctx := context.Background() + + Convey("OpenURL with valid params", func() { + q := &fpubQueue{} + queue, err := q.OpenURL(ctx, mustParseURL("fpub://"+testDir+"?name=test")) + So(err, ShouldBeNil) + So(queue, ShouldNotBeNil) + defer queue.Close(ctx) + }) + + Convey("OpenURL missing name param", func() { + q := &fpubQueue{} + _, err := q.OpenURL(ctx, mustParseURL("fpub://"+testDir)) + So(err, ShouldEqual, errMissingStreamName) + }) + + Convey("Push and Consume", func() { + q := &fpubQueue{} + queue, err := q.OpenURL(ctx, mustParseURL("fpub://"+testDir+"?name=pushtest")) + So(err, ShouldBeNil) + defer queue.Close(ctx) + + received := make(chan broker.Message, 1) + err = queue.Consume(func(ctx context.Context, msgs ...broker.Message) { + for _, m := range msgs { + received <- m + } + }) + So(err, ShouldBeNil) + + // Push a proto message + err = queue.Push(ctx, &emptypb.Empty{}) + So(err, ShouldBeNil) + + select { + case <-received: + // OK + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for message") + } + }) + + Convey("Push after Close returns error", func() { + q := &fpubQueue{} + queue, err := q.OpenURL(ctx, mustParseURL("fpub://"+testDir+"?name=closetest")) + So(err, ShouldBeNil) + queue.Close(ctx) + + err = queue.Push(ctx, &emptypb.Empty{}) + So(err, ShouldEqual, errQueueClosed) + }) + }) +} + +func TestRecovery(t *testing.T) { + Convey("Test recovery of processing messages", t, func() { + testDir, err := os.MkdirTemp("", "filepubsub-recovery-*") + So(err, ShouldBeNil) + defer os.RemoveAll(testDir) + + topicPath := filepath.Join(testDir, "recovery-topic") + ctx := context.Background() + + // Create topic and send message + topic, err := NewTopic(topicPath) + So(err, ShouldBeNil) + + err = topic.Send(ctx, &pubsub.Message{Body: []byte("recover me")}) + So(err, ShouldBeNil) + + sub, err := NewSubscription(topic, time.Minute) + So(err, ShouldBeNil) + + // Receive but don't ack (simulate crash) + msg, err := sub.Receive(ctx) + So(err, ShouldBeNil) + _ = msg // intentionally not acking + + // Shutdown without ack + sub.Shutdown(ctx) + topic.Shutdown(ctx) + + // Re-open - should recover stuck message + topic2, err := NewTopic(topicPath) + So(err, ShouldBeNil) + defer topic2.Shutdown(ctx) + + sub2, err := NewSubscription(topic2, time.Minute) + So(err, ShouldBeNil) + defer sub2.Shutdown(ctx) + + msg2, err := sub2.Receive(ctx) + So(err, ShouldBeNil) + So(string(msg2.Body), ShouldEqual, "recover me") + msg2.Ack() + }) +} + +func TestContextCancellation(t *testing.T) { + Convey("Test context cancellation", t, func() { + testDir, err := os.MkdirTemp("", "filepubsub-ctx-*") + So(err, ShouldBeNil) + defer os.RemoveAll(testDir) + + topicPath := filepath.Join(testDir, "ctx-topic") + + topic, err := NewTopic(topicPath) + So(err, ShouldBeNil) + defer topic.Shutdown(context.Background()) + + sub, err := NewSubscription(topic, time.Minute) + So(err, ShouldBeNil) + defer sub.Shutdown(context.Background()) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + _, err = sub.Receive(ctx) + So(err, ShouldNotBeNil) // Should return context canceled + }) +} + +func TestErrorCases(t *testing.T) { + Convey("Test error cases", t, func() { + ctx := context.Background() + + Convey("Invalid URL param", func() { + _, err := pubsub.OpenTopic(ctx, "file:///tmp/test?invalid=param") + So(err, ShouldNotBeNil) + }) + + Convey("Empty path", func() { + _, err := pubsub.OpenTopic(ctx, "file://") + So(err, ShouldNotBeNil) + }) + }) +} diff --git a/main.go b/main.go index 4b6e98f566..11d4596ad0 100644 --- a/main.go +++ b/main.go @@ -9,13 +9,16 @@ import ( _ "github.com/pydio/cells/v5/common/telemetry/log/otlp" _ "github.com/pydio/cells/v5/common/telemetry/log/service" _ "github.com/pydio/cells/v5/common/telemetry/log/stdout" + // Tracing _ "github.com/pydio/cells/v5/common/telemetry/tracing/jaeger" _ "github.com/pydio/cells/v5/common/telemetry/tracing/otlp" _ "github.com/pydio/cells/v5/common/telemetry/tracing/stdout" + // Metrics _ "github.com/pydio/cells/v5/common/telemetry/metrics/otlp" _ "github.com/pydio/cells/v5/common/telemetry/metrics/prometheus" + // Profiling _ "github.com/pydio/cells/v5/common/telemetry/profile/http_pull" _ "github.com/pydio/cells/v5/common/telemetry/profile/pyroscope" @@ -105,6 +108,7 @@ import ( // Cache _ "github.com/pydio/cells/v5/common/broker/debounce" + _ "github.com/pydio/cells/v5/common/broker/fsqueue" _ "github.com/pydio/cells/v5/common/broker/goque" _ "github.com/pydio/cells/v5/common/broker/jetstream" _ "github.com/pydio/cells/v5/common/utils/cache/bigcache" @@ -175,6 +179,7 @@ import ( _ "github.com/pydio/cells/v5/common/broker/grpcpubsub" _ "github.com/pydio/cells/v5/common/broker/nats" _ "gocloud.dev/pubsub/mempubsub" + // _ "gocloud.dev/pubsub/natspubsub" _ "gocloud.dev/pubsub/rabbitpubsub" @@ -183,6 +188,7 @@ import ( _ "github.com/pydio/cells/v5/common/config/file" _ "github.com/pydio/cells/v5/common/config/memory" _ "github.com/pydio/cells/v5/common/config/vault" + // _ "github.com/pydio/cells/v5/common/config/sql" //_ "github.com/pydio/cells/v5/common/config/viper"