diff --git a/Makefile b/Makefile index 4016bf24..0c141809 100644 --- a/Makefile +++ b/Makefile @@ -120,7 +120,7 @@ local-init-schemas: ## Manually apply all database schemas docker exec -i $(LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ done @echo "Applying queue schema to mysql-queue..." - @for file in extension/queue/sql/schema/*.sql; do \ + @for file in extension/queue/mysql/schema/*.sql; do \ echo " - Applying $$(basename $$file)..."; \ docker exec -i $(LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ done diff --git a/doc/howto/TESTING.md b/doc/howto/TESTING.md index a430da68..87de0513 100644 --- a/doc/howto/TESTING.md +++ b/doc/howto/TESTING.md @@ -29,7 +29,7 @@ SubmitQueue uses **two separate databases** to demonstrate proper architectural ### 2. Queue Database - **Purpose**: Messaging infrastructure (queue messages, offsets, partition leases) -- **Schema**: `extension/queue/sql/schema` +- **Schema**: `extension/queue/mysql/schema` - **Used by**: Gateway (publishes), Orchestrator (consumes) - **Connection**: `QUEUE_MYSQL_DSN` diff --git a/doc/rfc/sql-queue-rfc.md b/doc/rfc/sql-queue-rfc.md index 0e166a3f..c490f813 100644 --- a/doc/rfc/sql-queue-rfc.md +++ b/doc/rfc/sql-queue-rfc.md @@ -159,7 +159,7 @@ We chose **custom database-backed queue** because: - `(topic, partition_key, invisible_until, offset)`: Core fetch query - find visible messages in partition ordered by offset - `(topic, partition_key, id)`: Unique constraint and fast lookup for Ack/Nack -See `extension/queue/sql/schema/queue_messages.sql` for full schema. +See `extension/queue/mysql/schema/queue_messages.sql` for full schema. ### Partition Leases Table @@ -172,7 +172,7 @@ See `extension/queue/sql/schema/queue_messages.sql` for full schema. - `(leased_by)`: Find all partitions owned by a worker - `(lease_renewed_at)`: Detect stale leases across workers -See `extension/queue/sql/schema/queue_partition_leases.sql` for full schema. +See `extension/queue/mysql/schema/queue_partition_leases.sql` for full schema. ### Consumer Offsets Table @@ -185,7 +185,7 @@ See `extension/queue/sql/schema/queue_partition_leases.sql` for full schema. - `(consumer_group)`: Monitor all offsets for a consumer group - `(topic)`: Find all consumers for a topic -See `extension/queue/sql/schema/queue_offsets.sql` for full schema. +See `extension/queue/mysql/schema/queue_offsets.sql` for full schema. ### Dead Letter Queue Table @@ -201,7 +201,7 @@ See `extension/queue/sql/schema/queue_offsets.sql` for full schema. - `(failed_at)`: Time-based queries and cleanup - `(topic, partition_key, id)`: Unique constraint, prevents duplicates -See `extension/queue/sql/schema/queue_dlq.sql` for full schema. +See `extension/queue/mysql/schema/queue_dlq.sql` for full schema. ## Message Flow diff --git a/example/server/gateway/BUILD.bazel b/example/server/gateway/BUILD.bazel index bf84c820..801eae7f 100644 --- a/example/server/gateway/BUILD.bazel +++ b/example/server/gateway/BUILD.bazel @@ -13,7 +13,7 @@ go_library( deps = [ "//core/consumer", "//extension/counter/mysql", - "//extension/queue/sql", + "//extension/queue/mysql", "//extension/storage/mysql", "//gateway/controller", "//gateway/protopb", diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index ef18ef71..fe8dc021 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -15,7 +15,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" - queueSQL "github.com/uber/submitqueue/extension/queue/sql" + queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" "github.com/uber/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/gateway/controller" pb "github.com/uber/submitqueue/gateway/protopb" @@ -122,7 +122,7 @@ func run() error { defer queueDB.Close() // Initialize queue - sqlQueue, err := queueSQL.NewQueue(queueSQL.Params{ + mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: queueDB, Logger: logger, MetricsScope: scope.SubScope("queue"), @@ -130,7 +130,7 @@ func run() error { if err != nil { return fmt.Errorf("failed to create queue: %w", err) } - defer sqlQueue.Close() + defer mysqlQueue.Close() logger.Info("initialized dependencies", zap.String("app_dsn", appDSN), @@ -142,7 +142,7 @@ func run() error { // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, sqlQueue.Publisher(), consumer.TopicRequest.String()) + landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, mysqlQueue.Publisher(), consumer.TopicRequest.String()) gatewayServer := &GatewayServer{ pingController: pingController, landController: landController, diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 3809c40e..42f081af 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -13,7 +13,7 @@ go_library( deps = [ "//core/consumer", "//extension/queue", - "//extension/queue/sql", + "//extension/queue/mysql", "//orchestrator/controller", "//orchestrator/controller/request", "//orchestrator/protopb", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 84e45bad..5f229c5a 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -15,7 +15,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" extqueue "github.com/uber/submitqueue/extension/queue" - queueSQL "github.com/uber/submitqueue/extension/queue/sql" + queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" "github.com/uber/submitqueue/orchestrator/controller" "github.com/uber/submitqueue/orchestrator/controller/request" pb "github.com/uber/submitqueue/orchestrator/protopb" @@ -98,7 +98,7 @@ func run() error { defer queueDB.Close() // Initialize queue - sqlQueue, err := queueSQL.NewQueue(queueSQL.Params{ + mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: queueDB, Logger: logger, MetricsScope: scope.SubScope("queue"), @@ -106,7 +106,7 @@ func run() error { if err != nil { return fmt.Errorf("failed to create queue: %w", err) } - defer sqlQueue.Close() + defer mysqlQueue.Close() logger.Info("initialized queue", zap.String("dsn", queueDSN)) @@ -118,8 +118,8 @@ func run() error { registry := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Topic: consumer.TopicRequest, Queue: sqlQueue}, - {Topic: consumer.TopicToBatch, Queue: sqlQueue}, + {Topic: consumer.TopicRequest, Queue: mysqlQueue}, + {Topic: consumer.TopicToBatch, Queue: mysqlQueue}, }, []extqueue.SubscriptionConfig{ extqueue.DefaultSubscriptionConfig( diff --git a/extension/queue/sql/BUILD.bazel b/extension/queue/mysql/BUILD.bazel similarity index 89% rename from extension/queue/sql/BUILD.bazel rename to extension/queue/mysql/BUILD.bazel index 54cd048b..6ce04969 100644 --- a/extension/queue/sql/BUILD.bazel +++ b/extension/queue/mysql/BUILD.bazel @@ -1,7 +1,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "sql", + name = "mysql", srcs = [ "constants.go", "errors.go", @@ -15,7 +15,7 @@ go_library( "subscriber.go", "validation.go", ], - importpath = "github.com/uber/submitqueue/extension/queue/sql", + importpath = "github.com/uber/submitqueue/extension/queue/mysql", visibility = ["//visibility:public"], deps = [ "//entity/queue", @@ -27,7 +27,7 @@ go_library( ) go_test( - name = "sql_test", + name = "mysql_test", srcs = [ "message_store_test.go", "offset_store_test.go", @@ -36,7 +36,7 @@ go_test( "sql_test.go", "subscriber_test.go", ], - embed = [":sql"], + embed = [":mysql"], deps = [ "//entity/queue", "//extension/queue", diff --git a/extension/queue/sql/README.md b/extension/queue/mysql/README.md similarity index 97% rename from extension/queue/sql/README.md rename to extension/queue/mysql/README.md index 06eeeecc..6b0af6e9 100644 --- a/extension/queue/sql/README.md +++ b/extension/queue/mysql/README.md @@ -15,14 +15,14 @@ MySQL-based distributed queue with partition leasing, visibility timeout, and at import ( "database/sql" _ "github.com/go-sql-driver/mysql" - queueSQL "github.com/uber/submitqueue/extension/queue/sql" + queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" extqueue "github.com/uber/submitqueue/extension/queue" "github.com/uber/submitqueue/entity/queue" ) // Setup db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db") -q, _ := queueSQL.NewQueue(queueSQL.Params{ +q, _ := queueMySQL.NewQueue(queueMySQL.Params{ DB: db, Logger: logger, MetricsScope: metrics, diff --git a/extension/queue/sql/constants.go b/extension/queue/mysql/constants.go similarity index 96% rename from extension/queue/sql/constants.go rename to extension/queue/mysql/constants.go index 7591dde8..195ee0ab 100644 --- a/extension/queue/sql/constants.go +++ b/extension/queue/mysql/constants.go @@ -1,4 +1,4 @@ -package sql +package mysql // Common constants for frequently repeated strings across stores diff --git a/extension/queue/sql/errors.go b/extension/queue/mysql/errors.go similarity index 95% rename from extension/queue/sql/errors.go rename to extension/queue/mysql/errors.go index 57887595..345940b9 100644 --- a/extension/queue/sql/errors.go +++ b/extension/queue/mysql/errors.go @@ -1,4 +1,4 @@ -package sql +package mysql import "fmt" diff --git a/extension/queue/sql/message_store.go b/extension/queue/mysql/message_store.go similarity index 99% rename from extension/queue/sql/message_store.go rename to extension/queue/mysql/message_store.go index ba501ec7..96afec08 100644 --- a/extension/queue/sql/message_store.go +++ b/extension/queue/mysql/message_store.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/message_store_test.go b/extension/queue/mysql/message_store_test.go similarity index 99% rename from extension/queue/sql/message_store_test.go rename to extension/queue/mysql/message_store_test.go index 561857a6..203ff53d 100644 --- a/extension/queue/sql/message_store_test.go +++ b/extension/queue/mysql/message_store_test.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/mock_stores.go b/extension/queue/mysql/mock_stores.go similarity index 99% rename from extension/queue/sql/mock_stores.go rename to extension/queue/mysql/mock_stores.go index 4c57d1e8..92e659b9 100644 --- a/extension/queue/sql/mock_stores.go +++ b/extension/queue/mysql/mock_stores.go @@ -7,7 +7,7 @@ // // Package sql is a generated GoMock package. -package sql +package mysql import ( context "context" diff --git a/extension/queue/sql/offset_store.go b/extension/queue/mysql/offset_store.go similarity index 99% rename from extension/queue/sql/offset_store.go rename to extension/queue/mysql/offset_store.go index f0daeedf..6e834f71 100644 --- a/extension/queue/sql/offset_store.go +++ b/extension/queue/mysql/offset_store.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/offset_store_test.go b/extension/queue/mysql/offset_store_test.go similarity index 99% rename from extension/queue/sql/offset_store_test.go rename to extension/queue/mysql/offset_store_test.go index 9cba032d..f2a036a0 100644 --- a/extension/queue/sql/offset_store_test.go +++ b/extension/queue/mysql/offset_store_test.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/partition_lease_store.go b/extension/queue/mysql/partition_lease_store.go similarity index 99% rename from extension/queue/sql/partition_lease_store.go rename to extension/queue/mysql/partition_lease_store.go index 7c9097e5..f7717770 100644 --- a/extension/queue/sql/partition_lease_store.go +++ b/extension/queue/mysql/partition_lease_store.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/partition_lease_store_test.go b/extension/queue/mysql/partition_lease_store_test.go similarity index 99% rename from extension/queue/sql/partition_lease_store_test.go rename to extension/queue/mysql/partition_lease_store_test.go index 6142b3d9..f0860f94 100644 --- a/extension/queue/sql/partition_lease_store_test.go +++ b/extension/queue/mysql/partition_lease_store_test.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/publisher.go b/extension/queue/mysql/publisher.go similarity index 99% rename from extension/queue/sql/publisher.go rename to extension/queue/mysql/publisher.go index ac8d3a5c..ce3a24b0 100644 --- a/extension/queue/sql/publisher.go +++ b/extension/queue/mysql/publisher.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/publisher_test.go b/extension/queue/mysql/publisher_test.go similarity index 99% rename from extension/queue/sql/publisher_test.go rename to extension/queue/mysql/publisher_test.go index fb11a1ae..2d61a23d 100644 --- a/extension/queue/sql/publisher_test.go +++ b/extension/queue/mysql/publisher_test.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/schema/BUILD.bazel b/extension/queue/mysql/schema/BUILD.bazel similarity index 100% rename from extension/queue/sql/schema/BUILD.bazel rename to extension/queue/mysql/schema/BUILD.bazel diff --git a/extension/queue/sql/schema/queue_messages.sql b/extension/queue/mysql/schema/queue_messages.sql similarity index 100% rename from extension/queue/sql/schema/queue_messages.sql rename to extension/queue/mysql/schema/queue_messages.sql diff --git a/extension/queue/sql/schema/queue_offsets.sql b/extension/queue/mysql/schema/queue_offsets.sql similarity index 100% rename from extension/queue/sql/schema/queue_offsets.sql rename to extension/queue/mysql/schema/queue_offsets.sql diff --git a/extension/queue/sql/schema/queue_partition_leases.sql b/extension/queue/mysql/schema/queue_partition_leases.sql similarity index 100% rename from extension/queue/sql/schema/queue_partition_leases.sql rename to extension/queue/mysql/schema/queue_partition_leases.sql diff --git a/extension/queue/sql/sql.go b/extension/queue/mysql/sql.go similarity index 99% rename from extension/queue/sql/sql.go rename to extension/queue/mysql/sql.go index 274ea404..72ff28d4 100644 --- a/extension/queue/sql/sql.go +++ b/extension/queue/mysql/sql.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "database/sql" diff --git a/extension/queue/sql/sql_test.go b/extension/queue/mysql/sql_test.go similarity index 99% rename from extension/queue/sql/sql_test.go rename to extension/queue/mysql/sql_test.go index e165af93..91f2f494 100644 --- a/extension/queue/sql/sql_test.go +++ b/extension/queue/mysql/sql_test.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "database/sql" diff --git a/extension/queue/sql/stores.go b/extension/queue/mysql/stores.go similarity index 99% rename from extension/queue/sql/stores.go rename to extension/queue/mysql/stores.go index 5ab8be98..a4267bc0 100644 --- a/extension/queue/sql/stores.go +++ b/extension/queue/mysql/stores.go @@ -1,4 +1,4 @@ -package sql +package mysql //go:generate mockgen -source=stores.go -destination=mock_stores.go -package=sql diff --git a/extension/queue/sql/subscriber.go b/extension/queue/mysql/subscriber.go similarity index 99% rename from extension/queue/sql/subscriber.go rename to extension/queue/mysql/subscriber.go index c9f4b807..576a22b5 100644 --- a/extension/queue/sql/subscriber.go +++ b/extension/queue/mysql/subscriber.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/subscriber_test.go b/extension/queue/mysql/subscriber_test.go similarity index 99% rename from extension/queue/sql/subscriber_test.go rename to extension/queue/mysql/subscriber_test.go index 577d5d71..440f39ce 100644 --- a/extension/queue/sql/subscriber_test.go +++ b/extension/queue/mysql/subscriber_test.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" diff --git a/extension/queue/sql/validation.go b/extension/queue/mysql/validation.go similarity index 97% rename from extension/queue/sql/validation.go rename to extension/queue/mysql/validation.go index b021aae2..d792d711 100644 --- a/extension/queue/sql/validation.go +++ b/extension/queue/mysql/validation.go @@ -1,4 +1,4 @@ -package sql +package mysql import "fmt" diff --git a/test/e2e/BUILD.bazel b/test/e2e/BUILD.bazel index 7c20d409..a6ac7943 100644 --- a/test/e2e/BUILD.bazel +++ b/test/e2e/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "//:go.mod", "//example/server:docker-compose.yml", "//extension/counter/mysql/schema", - "//extension/queue/sql/schema", + "//extension/queue/mysql/schema", "//extension/storage/mysql/schema", ], tags = [ diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 86f8573d..97539b4e 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -73,7 +73,7 @@ func (s *E2EIntegrationSuite) SetupSuite() { testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/counter/mysql/schema")) // Apply schemas programmatically to queue database - testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/sql/schema")) + testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/mysql/schema")) s.log.Logf("Schemas applied successfully") diff --git a/test/integration/extension/queue/sql/BUILD.bazel b/test/integration/extension/queue/mysql/BUILD.bazel similarity index 84% rename from test/integration/extension/queue/sql/BUILD.bazel rename to test/integration/extension/queue/mysql/BUILD.bazel index 63a71d10..812d8a65 100644 --- a/test/integration/extension/queue/sql/BUILD.bazel +++ b/test/integration/extension/queue/mysql/BUILD.bazel @@ -1,17 +1,17 @@ load("@rules_go//go:def.bzl", "go_test") go_test( - name = "sql_test", + name = "mysql_test", srcs = ["queue_test.go"], data = [ "docker-compose.yml", - "//extension/queue/sql/schema", + "//extension/queue/mysql/schema", ], tags = ["integration"], deps = [ "//entity/queue", "//extension/queue", - "//extension/queue/sql", + "//extension/queue/mysql", "//test/testutil", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//assert", diff --git a/test/integration/extension/queue/sql/docker-compose.yml b/test/integration/extension/queue/mysql/docker-compose.yml similarity index 100% rename from test/integration/extension/queue/sql/docker-compose.yml rename to test/integration/extension/queue/mysql/docker-compose.yml diff --git a/test/integration/extension/queue/sql/queue_test.go b/test/integration/extension/queue/mysql/queue_test.go similarity index 97% rename from test/integration/extension/queue/sql/queue_test.go rename to test/integration/extension/queue/mysql/queue_test.go index 71a048e5..2a005820 100644 --- a/test/integration/extension/queue/sql/queue_test.go +++ b/test/integration/extension/queue/mysql/queue_test.go @@ -1,4 +1,4 @@ -package sql +package mysql import ( "context" @@ -18,7 +18,7 @@ import ( "github.com/uber/submitqueue/entity/queue" extqueue "github.com/uber/submitqueue/extension/queue" - queueSQL "github.com/uber/submitqueue/extension/queue/sql" + queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" "github.com/uber/submitqueue/test/testutil" ) @@ -63,7 +63,7 @@ func (s *SQLQueueIntegrationSuite) SetupSuite() { s.log.Logf("Connected to MySQL for queue testing") // Apply schemas programmatically from directory (queue has 3 schema files) - schemaDir := testutil.SchemaDir("extension/queue/sql/schema") + schemaDir := testutil.SchemaDir("extension/queue/mysql/schema") testutil.ApplySchema(t, s.log, s.db, schemaDir) s.log.Logf("Schemas applied successfully") @@ -121,7 +121,7 @@ func (s *SQLQueueIntegrationSuite) TestPublishAndSubscribe() { t := s.T() // Create queue - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -191,7 +191,7 @@ func (s *SQLQueueIntegrationSuite) TestPublishAndSubscribe() { func (s *SQLQueueIntegrationSuite) TestMultiplePartitions() { t := s.T() - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -236,7 +236,7 @@ func (s *SQLQueueIntegrationSuite) TestMultiplePartitions() { func (s *SQLQueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { t := s.T() - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -323,7 +323,7 @@ func (s *SQLQueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { func (s *SQLQueueIntegrationSuite) TestNackWithDelay() { t := s.T() - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -375,7 +375,7 @@ func (s *SQLQueueIntegrationSuite) TestNackWithDelay() { func (s *SQLQueueIntegrationSuite) TestIdempotentPublish() { t := s.T() - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -422,7 +422,7 @@ func (s *SQLQueueIntegrationSuite) TestIdempotentPublish() { func (s *SQLQueueIntegrationSuite) TestConcurrentPublishers() { t := s.T() - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -478,7 +478,7 @@ func (s *SQLQueueIntegrationSuite) TestConcurrentPublishers() { func (s *SQLQueueIntegrationSuite) TestCrashRecovery() { t := s.T() - q1, err := queueSQL.NewQueue(queueSQL.Params{ + q1, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -520,7 +520,7 @@ func (s *SQLQueueIntegrationSuite) TestCrashRecovery() { time.Sleep(waitTime) // Start worker 2 with same consumer group - q2, err := queueSQL.NewQueue(queueSQL.Params{ + q2, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -556,7 +556,7 @@ func (s *SQLQueueIntegrationSuite) TestMultipleConsumerGroups() { topic := "multi_group_topic" // Create two different consumer groups - q1, err := queueSQL.NewQueue(queueSQL.Params{ + q1, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -564,7 +564,7 @@ func (s *SQLQueueIntegrationSuite) TestMultipleConsumerGroups() { require.NoError(t, err) defer q1.Close() - q2, err := queueSQL.NewQueue(queueSQL.Params{ + q2, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -635,7 +635,7 @@ func (s *SQLQueueIntegrationSuite) TestMultipleWorkersInConsumerGroup() { consumerGroup := "shared-group" // Create two workers in same consumer group - q1, err := queueSQL.NewQueue(queueSQL.Params{ + q1, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -643,7 +643,7 @@ func (s *SQLQueueIntegrationSuite) TestMultipleWorkersInConsumerGroup() { require.NoError(t, err) defer q1.Close() - q2, err := queueSQL.NewQueue(queueSQL.Params{ + q2, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -752,7 +752,7 @@ func (s *SQLQueueIntegrationSuite) TestConcurrentSubscribers() { totalMessages := numSubscribers * messagesPerSubscriber // Create publisher - pubQueue, err := queueSQL.NewQueue(queueSQL.Params{ + pubQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -767,7 +767,7 @@ func (s *SQLQueueIntegrationSuite) TestConcurrentSubscribers() { var deliveryChans []<-chan extqueue.Delivery for i := 0; i < numSubscribers; i++ { - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -857,7 +857,7 @@ func (s *SQLQueueIntegrationSuite) TestDeadLetterQueue() { topic := "dlq_topic" - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -953,7 +953,7 @@ func (s *SQLQueueIntegrationSuite) TestMessageOrderingWithinPartition() { topic := "ordering_topic" partitionKey := "ordered-partition" - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -1004,7 +1004,7 @@ func (s *SQLQueueIntegrationSuite) TestLateSubscriber() { topic := "late_subscriber_topic" - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -1055,7 +1055,7 @@ func (s *SQLQueueIntegrationSuite) TestEmptyTopicSubscribe() { topic := "empty_topic" - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -1099,7 +1099,7 @@ func (s *SQLQueueIntegrationSuite) TestGracefulShutdownDuringProcessing() { topic := "shutdown_topic" - q, err := queueSQL.NewQueue(queueSQL.Params{ + q, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, @@ -1165,7 +1165,7 @@ drainLoop: // Start new subscriber to verify all messages are redelivered t.Logf("Starting new subscriber to verify message recovery...") - q2, err := queueSQL.NewQueue(queueSQL.Params{ + q2, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, diff --git a/test/integration/gateway/BUILD.bazel b/test/integration/gateway/BUILD.bazel index 72f5c2a6..555782d5 100644 --- a/test/integration/gateway/BUILD.bazel +++ b/test/integration/gateway/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "//:go.mod", "//example/server/gateway:docker-compose.yml", "//extension/counter/mysql/schema", - "//extension/queue/sql/schema", + "//extension/queue/mysql/schema", "//extension/storage/mysql/schema", ], tags = ["integration"], diff --git a/test/integration/gateway/suite_test.go b/test/integration/gateway/suite_test.go index 740433ce..c16cf931 100644 --- a/test/integration/gateway/suite_test.go +++ b/test/integration/gateway/suite_test.go @@ -74,7 +74,7 @@ func (s *GatewayIntegrationSuite) SetupSuite() { testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/counter/mysql/schema")) // Apply schemas programmatically to queue database - testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/sql/schema")) + testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/mysql/schema")) s.log.Logf("Schemas applied successfully") diff --git a/test/integration/orchestrator/BUILD.bazel b/test/integration/orchestrator/BUILD.bazel index 86103413..a0390db7 100644 --- a/test/integration/orchestrator/BUILD.bazel +++ b/test/integration/orchestrator/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "//:go.mod", "//example/server/orchestrator:docker-compose.yml", "//extension/counter/mysql/schema", - "//extension/queue/sql/schema", + "//extension/queue/mysql/schema", "//extension/storage/mysql/schema", ], tags = ["integration"], diff --git a/test/integration/orchestrator/suite_test.go b/test/integration/orchestrator/suite_test.go index fd59a686..fe419494 100644 --- a/test/integration/orchestrator/suite_test.go +++ b/test/integration/orchestrator/suite_test.go @@ -74,7 +74,7 @@ func (s *OrchestratorIntegrationSuite) SetupSuite() { testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/counter/mysql/schema")) // Apply schemas programmatically to queue database - testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/sql/schema")) + testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/mysql/schema")) s.log.Logf("Schemas applied successfully") diff --git a/test/testutil/schema.go b/test/testutil/schema.go index 595220b0..68e08ee4 100644 --- a/test/testutil/schema.go +++ b/test/testutil/schema.go @@ -14,7 +14,7 @@ import ( // SchemaDir returns the path to a schema directory. // It checks for both Bazel runfiles and direct go test paths. -// relativePath should be like "extension/storage/mysql/schema" or "extension/queue/sql/schema" +// relativePath should be like "extension/storage/mysql/schema" or "extension/queue/mysql/schema" func SchemaDir(relativePath string) string { // Bazel runfiles path if dir := os.Getenv("TEST_SRCDIR"); dir != "" {