Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ jobs:
go tool cover -func=coverage.out

- name: Upload coverage results
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage
name: coverage-${{ matrix.go-version }}
path: coverage.out

test-lint:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 1.1.3

- Allow the deletion of a binding between an exchange and a queue, with `MQTTManager.UnbindExchangeFromQueueViaRoutingKey()`.

# 1.1.2

- Allow sending messages with a TTL via `PublishWithOptions` by adding a new `TTL` property in `PublishingOptions` ([PR](https://github.com/KardinalAI/gorabbit/pull/19)).
Expand Down
82 changes: 48 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,46 @@ Gorabbit is a wrapper that provides high level and robust RabbitMQ operations th

This wrapper depends on the official [Go RabbitMQ plugin](https://github.com/rabbitmq/amqp091-go).

* [Installation](#installation)
* [Go Module](#go-module)
* [Environment Variables](#environment-variables)
* [Always On Mechanism](#always-on-mechanism)
* [Client](#client)
* [Initialization](#client-initialization)
* [Options](#client-options)
* [Default Options](#client-with-default-options)
* [Custom Options](#client-with-custom-options)
* [Builder](#client-options-using-the-builder)
* [Struct](#client-options-using-struct-initialization)
* [Disconnection](#client-disconnection)
* [Publishing](#publishing)
* [Consuming](#consuming)
* [Ready and Health Checks](#ready-and-health-checks)
* [Manager](#manager)
* [Initialization](#manager-initialization)
* [Options](#manager-options)
* [Default Options](#manager-with-default-options)
* [Custom Options](#manager-with-custom-options)
* [Builder](#manager-options-using-the-builder)
* [Struct](#manager-options-using-struct-initialization)
* [Disconnection](#manager-disconnection)
* [Operations](#manager-operations)
* [Exchange Creation](#exchange-creation)
* [Queue Creation](#queue-creation)
* [Binding Creation](#binding-creation)
* [Message Count](#queue-messages-count)
* [Push Message](#push-message)
* [Pop Message](#pop-message)
* [Purge Queue](#purge-queue)
* [Delete Queue](#delete-queue)
* [Delete Exchange](#delete-exchange)
* [Setup From Definitions](#setup-from-schema-definition-file)
- [Gorabbit](#gorabbit)
- [Installation](#installation)
- [Go module](#go-module)
- [Environment variables](#environment-variables)
- [Always-on mechanism](#always-on-mechanism)
- [Client](#client)
- [Client initialization](#client-initialization)
- [Client options](#client-options)
- [Client with default options](#client-with-default-options)
- [Client with options from environment variables](#client-with-options-from-environment-variables)
- [Client with custom options](#client-with-custom-options)
- [Client options using the builder](#client-options-using-the-builder)
- [Client options using struct initialization](#client-options-using-struct-initialization)
- [Client disconnection](#client-disconnection)
- [Publishing](#publishing)
- [Consuming](#consuming)
- [Ready and Health checks](#ready-and-health-checks)
- [Manager](#manager)
- [Manager initialization](#manager-initialization)
- [Manager options](#manager-options)
- [Manager with default options](#manager-with-default-options)
- [Manager with options from environment variables](#manager-with-options-from-environment-variables)
- [Manager with custom options](#manager-with-custom-options)
- [Manager options using the builder](#manager-options-using-the-builder)
- [Manager options using struct initialization](#manager-options-using-struct-initialization)
- [Manager disconnection](#manager-disconnection)
- [Manager operations](#manager-operations)
- [Exchange creation](#exchange-creation)
- [Queue creation](#queue-creation)
- [Binding creation](#binding-creation)
- [Queue messages count](#queue-messages-count)
- [Push message](#push-message)
- [Pop message](#pop-message)
- [Delete binding](#delete-binding)
- [Purge queue](#purge-queue)
- [Delete queue](#delete-queue)
- [Delete exchange](#delete-exchange)
- [Setup from schema definition file](#setup-from-schema-definition-file)
- [Launch Local RabbitMQ Server](#launch-local-rabbitmq-server)
- [License](#license)

## Installation

Expand Down Expand Up @@ -475,6 +481,14 @@ Retrieves a single message from a given queue and auto acknowledges it if `autoA
message, err := manager.PopMessageFromQueue("events_queue", true)
```

#### Delete binding

Deletes a binding.

```go
err := manager.UnbindExchangeFromQueueViaRoutingKey("events_exchange", "events_queue", "event.foo.bar.created")
```

#### Purge queue

Deletes all messages from a given queue.
Expand Down
24 changes: 24 additions & 0 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type MQTTManager interface {
// Returns an error if the connection to the RabbitMQ server is down or if the exchange or queue does not exist.
BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error

// UnbindExchangeFromQueueViaRoutingKey will unbind an exchange from a queue via a given routingKey.
// Returns an error if the connection to the RabbitMQ server is down.
UnbindExchangeFromQueueViaRoutingKey(exchange, queue, routingKey string) error

// GetNumberOfMessages retrieves the number of messages currently sitting in a given queue.
// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
GetNumberOfMessages(queue string) (int, error)
Expand Down Expand Up @@ -288,6 +292,26 @@ func (manager *mqttManager) BindExchangeToQueueViaRoutingKey(exchange, queue, ro
)
}

func (manager *mqttManager) UnbindExchangeFromQueueViaRoutingKey(exchange, queue, routingKey string) error {
// Manager is disabled, so we do nothing and return no error.
if manager.disabled {
return nil
}

// If the manager is not ready, we return its error.
if ready, err := manager.ready(); !ready {
return err
}

// We unbind the queue from a given exchange and routing key via the channel.
return manager.channel.QueueUnbind(
queue,
routingKey,
exchange,
nil,
)
}

func (manager *mqttManager) GetNumberOfMessages(queue string) (int, error) {
// Manager is disabled, so we do nothing and return no error.
if manager.disabled {
Expand Down
104 changes: 98 additions & 6 deletions manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,78 @@ func (suite *ManagerTestSuite) TestBindExchangeToQueueViaRoutingKey() {
require.NoError(t, manager.Disconnect())
}

func (suite *ManagerTestSuite) TestUnbindExchangeFromQueueViaRoutingKey() {
t := suite.T()

t.Setenv("RABBITMQ_HOST", suite.rabbitMQContainer.ContainerHost)
t.Setenv("RABBITMQ_PORT", strconv.Itoa(int(suite.rabbitMQContainer.ContainerPort)))
t.Setenv("RABBITMQ_USERNAME", suite.rabbitMQContainer.Username)
t.Setenv("RABBITMQ_PASSWORD", suite.rabbitMQContainer.Password)

manager, err := gorabbit.NewManagerFromEnv()

require.NoError(t, err)
assert.NotNil(t, manager)

queueConfig := gorabbit.QueueConfig{
Name: "test_queue",
Durable: true,
Exclusive: false,
}

err = manager.CreateQueue(queueConfig)

require.NoError(t, err)

exchangeConfig := gorabbit.ExchangeConfig{
Name: "test_exchange",
Type: "topic",
}

err = manager.CreateExchange(exchangeConfig)

require.NoError(t, err)

err = manager.BindExchangeToQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "routing_key")

require.NoError(t, err)

t.Run("Unbinding existing exchange from existing queue via routing key", func(t *testing.T) {
err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "routing_key")

require.NoError(t, err)
})

t.Run("Unbinding again existing exchange from existing queue via routing key", func(t *testing.T) {
err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "routing_key")

require.NoError(t, err)
})

t.Run("Unbinding existing exchange from existing queue via unknown routing key", func(t *testing.T) {
err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "unk_routing_key")

require.NoError(t, err)
})

t.Run("Unbinding non-existing exchange from existing queue via routing key", func(t *testing.T) {
err = manager.UnbindExchangeFromQueueViaRoutingKey("non_existing_exchange", queueConfig.Name, "routing_key")

require.NoError(t, err)
})

t.Run("Unbinding existing exchange from non-existing queue via routing key", func(t *testing.T) {
err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, "non_existing_queue", "routing_key")

require.NoError(t, err)
})

require.NoError(t, manager.DeleteQueue(exchangeConfig.Name))
require.NoError(t, manager.DeleteQueue(queueConfig.Name))

require.NoError(t, manager.Disconnect())
}

func (suite *ManagerTestSuite) TestGetNumberOfMessages() {
t := suite.T()

Expand Down Expand Up @@ -565,20 +637,40 @@ func (suite *ManagerTestSuite) TestPushMessageToExchange() {

require.NoError(t, err)

// Push message when the binding exists
err = manager.PushMessageToExchange(exchangeConfig.Name, "routing_key", "Some message")

require.NoError(t, err)

// Small sleep for allowing message to be sent.
time.Sleep(50 * time.Millisecond)

require.NoError(t, err)

count, countErr := manager.GetNumberOfMessages(queueConfig.Name)

require.NoError(t, countErr)
assert.Equal(t, 1, count)

require.NoError(t, manager.PurgeQueue(queueConfig.Name))

err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "routing_key")

require.NoError(t, err)

// Push message when the binding no longer exists
err = manager.PushMessageToExchange(exchangeConfig.Name, "routing_key", "Some message")

require.NoError(t, err)

// Small sleep for allowing message to be sent.
time.Sleep(50 * time.Millisecond)

count, countErr = manager.GetNumberOfMessages(queueConfig.Name)

require.NoError(t, countErr)
assert.Zero(t, count)

require.NoError(t, manager.PurgeQueue(queueConfig.Name))

count, countErr = manager.GetNumberOfMessages(queueConfig.Name)

require.NoError(t, countErr)
Expand All @@ -591,10 +683,10 @@ func (suite *ManagerTestSuite) TestPushMessageToExchange() {
t.Run("Pushing message to non-existing exchange should still work", func(t *testing.T) {
err = manager.PushMessageToExchange("non_existing_exchange", "routing_key", "Some message")

require.NoError(t, err)

// Small sleep for allowing message to be sent.
time.Sleep(50 * time.Millisecond)

require.NoError(t, err)
})

require.NoError(t, manager.Disconnect())
Expand Down Expand Up @@ -639,11 +731,11 @@ func (suite *ManagerTestSuite) TestPopMessageFromQueue() {

err = manager.PushMessageToExchange(exchangeConfig.Name, "routing_key", "Some message")

require.NoError(t, err)

// Small sleep for allowing message to be sent.
time.Sleep(50 * time.Millisecond)

require.NoError(t, err)

count, countErr := manager.GetNumberOfMessages(queueConfig.Name)

require.NoError(t, countErr)
Expand Down