diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 234827c..fc097cb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,9 +36,9 @@ jobs: go tool cover -func=coverage.out - name: Upload coverage results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: coverage + name: coverage-${{ matrix.go-version }} path: coverage.out test-lint: diff --git a/CHANGELOG.md b/CHANGELOG.md index b055b06..7c908ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 1.1.3 + +- Allow the deletion of a binding between an exchange and a queue, with `MQTTManager.UnbindExchangeFromQueueViaRoutingKey()`. + # 1.1.2 - Allow sending messages with a TTL via `PublishWithOptions` by adding a new `TTL` property in `PublishingOptions` ([PR](https://github.com/KardinalAI/gorabbit/pull/19)). diff --git a/README.md b/README.md index 08d9afd..47480b9 100644 --- a/README.md +++ b/README.md @@ -8,40 +8,46 @@ Gorabbit is a wrapper that provides high level and robust RabbitMQ operations th This wrapper depends on the official [Go RabbitMQ plugin](https://github.com/rabbitmq/amqp091-go). -* [Installation](#installation) - * [Go Module](#go-module) - * [Environment Variables](#environment-variables) -* [Always On Mechanism](#always-on-mechanism) -* [Client](#client) - * [Initialization](#client-initialization) - * [Options](#client-options) - * [Default Options](#client-with-default-options) - * [Custom Options](#client-with-custom-options) - * [Builder](#client-options-using-the-builder) - * [Struct](#client-options-using-struct-initialization) - * [Disconnection](#client-disconnection) - * [Publishing](#publishing) - * [Consuming](#consuming) - * [Ready and Health Checks](#ready-and-health-checks) -* [Manager](#manager) - * [Initialization](#manager-initialization) - * [Options](#manager-options) - * [Default Options](#manager-with-default-options) - * [Custom Options](#manager-with-custom-options) - * [Builder](#manager-options-using-the-builder) - * [Struct](#manager-options-using-struct-initialization) - * [Disconnection](#manager-disconnection) - * [Operations](#manager-operations) - * [Exchange Creation](#exchange-creation) - * [Queue Creation](#queue-creation) - * [Binding Creation](#binding-creation) - * [Message Count](#queue-messages-count) - * [Push Message](#push-message) - * [Pop Message](#pop-message) - * [Purge Queue](#purge-queue) - * [Delete Queue](#delete-queue) - * [Delete Exchange](#delete-exchange) - * [Setup From Definitions](#setup-from-schema-definition-file) +- [Gorabbit](#gorabbit) + - [Installation](#installation) + - [Go module](#go-module) + - [Environment variables](#environment-variables) + - [Always-on mechanism](#always-on-mechanism) + - [Client](#client) + - [Client initialization](#client-initialization) + - [Client options](#client-options) + - [Client with default options](#client-with-default-options) + - [Client with options from environment variables](#client-with-options-from-environment-variables) + - [Client with custom options](#client-with-custom-options) + - [Client options using the builder](#client-options-using-the-builder) + - [Client options using struct initialization](#client-options-using-struct-initialization) + - [Client disconnection](#client-disconnection) + - [Publishing](#publishing) + - [Consuming](#consuming) + - [Ready and Health checks](#ready-and-health-checks) + - [Manager](#manager) + - [Manager initialization](#manager-initialization) + - [Manager options](#manager-options) + - [Manager with default options](#manager-with-default-options) + - [Manager with options from environment variables](#manager-with-options-from-environment-variables) + - [Manager with custom options](#manager-with-custom-options) + - [Manager options using the builder](#manager-options-using-the-builder) + - [Manager options using struct initialization](#manager-options-using-struct-initialization) + - [Manager disconnection](#manager-disconnection) + - [Manager operations](#manager-operations) + - [Exchange creation](#exchange-creation) + - [Queue creation](#queue-creation) + - [Binding creation](#binding-creation) + - [Queue messages count](#queue-messages-count) + - [Push message](#push-message) + - [Pop message](#pop-message) + - [Delete binding](#delete-binding) + - [Purge queue](#purge-queue) + - [Delete queue](#delete-queue) + - [Delete exchange](#delete-exchange) + - [Setup from schema definition file](#setup-from-schema-definition-file) + - [Launch Local RabbitMQ Server](#launch-local-rabbitmq-server) + - [License](#license) ## Installation @@ -475,6 +481,14 @@ Retrieves a single message from a given queue and auto acknowledges it if `autoA message, err := manager.PopMessageFromQueue("events_queue", true) ``` +#### Delete binding + +Deletes a binding. + +```go +err := manager.UnbindExchangeFromQueueViaRoutingKey("events_exchange", "events_queue", "event.foo.bar.created") +``` + #### Purge queue Deletes all messages from a given queue. diff --git a/manager.go b/manager.go index 54e9a8e..1e11cfe 100644 --- a/manager.go +++ b/manager.go @@ -31,6 +31,10 @@ type MQTTManager interface { // Returns an error if the connection to the RabbitMQ server is down or if the exchange or queue does not exist. BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error + // UnbindExchangeFromQueueViaRoutingKey will unbind an exchange from a queue via a given routingKey. + // Returns an error if the connection to the RabbitMQ server is down. + UnbindExchangeFromQueueViaRoutingKey(exchange, queue, routingKey string) error + // GetNumberOfMessages retrieves the number of messages currently sitting in a given queue. // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist. GetNumberOfMessages(queue string) (int, error) @@ -288,6 +292,26 @@ func (manager *mqttManager) BindExchangeToQueueViaRoutingKey(exchange, queue, ro ) } +func (manager *mqttManager) UnbindExchangeFromQueueViaRoutingKey(exchange, queue, routingKey string) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We unbind the queue from a given exchange and routing key via the channel. + return manager.channel.QueueUnbind( + queue, + routingKey, + exchange, + nil, + ) +} + func (manager *mqttManager) GetNumberOfMessages(queue string) (int, error) { // Manager is disabled, so we do nothing and return no error. if manager.disabled { diff --git a/manager_test.go b/manager_test.go index 7d42601..1fb3cd8 100644 --- a/manager_test.go +++ b/manager_test.go @@ -471,6 +471,78 @@ func (suite *ManagerTestSuite) TestBindExchangeToQueueViaRoutingKey() { require.NoError(t, manager.Disconnect()) } +func (suite *ManagerTestSuite) TestUnbindExchangeFromQueueViaRoutingKey() { + t := suite.T() + + t.Setenv("RABBITMQ_HOST", suite.rabbitMQContainer.ContainerHost) + t.Setenv("RABBITMQ_PORT", strconv.Itoa(int(suite.rabbitMQContainer.ContainerPort))) + t.Setenv("RABBITMQ_USERNAME", suite.rabbitMQContainer.Username) + t.Setenv("RABBITMQ_PASSWORD", suite.rabbitMQContainer.Password) + + manager, err := gorabbit.NewManagerFromEnv() + + require.NoError(t, err) + assert.NotNil(t, manager) + + queueConfig := gorabbit.QueueConfig{ + Name: "test_queue", + Durable: true, + Exclusive: false, + } + + err = manager.CreateQueue(queueConfig) + + require.NoError(t, err) + + exchangeConfig := gorabbit.ExchangeConfig{ + Name: "test_exchange", + Type: "topic", + } + + err = manager.CreateExchange(exchangeConfig) + + require.NoError(t, err) + + err = manager.BindExchangeToQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "routing_key") + + require.NoError(t, err) + + t.Run("Unbinding existing exchange from existing queue via routing key", func(t *testing.T) { + err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "routing_key") + + require.NoError(t, err) + }) + + t.Run("Unbinding again existing exchange from existing queue via routing key", func(t *testing.T) { + err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "routing_key") + + require.NoError(t, err) + }) + + t.Run("Unbinding existing exchange from existing queue via unknown routing key", func(t *testing.T) { + err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "unk_routing_key") + + require.NoError(t, err) + }) + + t.Run("Unbinding non-existing exchange from existing queue via routing key", func(t *testing.T) { + err = manager.UnbindExchangeFromQueueViaRoutingKey("non_existing_exchange", queueConfig.Name, "routing_key") + + require.NoError(t, err) + }) + + t.Run("Unbinding existing exchange from non-existing queue via routing key", func(t *testing.T) { + err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, "non_existing_queue", "routing_key") + + require.NoError(t, err) + }) + + require.NoError(t, manager.DeleteQueue(exchangeConfig.Name)) + require.NoError(t, manager.DeleteQueue(queueConfig.Name)) + + require.NoError(t, manager.Disconnect()) +} + func (suite *ManagerTestSuite) TestGetNumberOfMessages() { t := suite.T() @@ -565,13 +637,14 @@ func (suite *ManagerTestSuite) TestPushMessageToExchange() { require.NoError(t, err) + // Push message when the binding exists err = manager.PushMessageToExchange(exchangeConfig.Name, "routing_key", "Some message") + require.NoError(t, err) + // Small sleep for allowing message to be sent. time.Sleep(50 * time.Millisecond) - require.NoError(t, err) - count, countErr := manager.GetNumberOfMessages(queueConfig.Name) require.NoError(t, countErr) @@ -579,6 +652,25 @@ func (suite *ManagerTestSuite) TestPushMessageToExchange() { require.NoError(t, manager.PurgeQueue(queueConfig.Name)) + err = manager.UnbindExchangeFromQueueViaRoutingKey(exchangeConfig.Name, queueConfig.Name, "routing_key") + + require.NoError(t, err) + + // Push message when the binding no longer exists + err = manager.PushMessageToExchange(exchangeConfig.Name, "routing_key", "Some message") + + require.NoError(t, err) + + // Small sleep for allowing message to be sent. + time.Sleep(50 * time.Millisecond) + + count, countErr = manager.GetNumberOfMessages(queueConfig.Name) + + require.NoError(t, countErr) + assert.Zero(t, count) + + require.NoError(t, manager.PurgeQueue(queueConfig.Name)) + count, countErr = manager.GetNumberOfMessages(queueConfig.Name) require.NoError(t, countErr) @@ -591,10 +683,10 @@ func (suite *ManagerTestSuite) TestPushMessageToExchange() { t.Run("Pushing message to non-existing exchange should still work", func(t *testing.T) { err = manager.PushMessageToExchange("non_existing_exchange", "routing_key", "Some message") + require.NoError(t, err) + // Small sleep for allowing message to be sent. time.Sleep(50 * time.Millisecond) - - require.NoError(t, err) }) require.NoError(t, manager.Disconnect()) @@ -639,11 +731,11 @@ func (suite *ManagerTestSuite) TestPopMessageFromQueue() { err = manager.PushMessageToExchange(exchangeConfig.Name, "routing_key", "Some message") + require.NoError(t, err) + // Small sleep for allowing message to be sent. time.Sleep(50 * time.Millisecond) - require.NoError(t, err) - count, countErr := manager.GetNumberOfMessages(queueConfig.Name) require.NoError(t, countErr)