Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ local-init-schemas: ## Manually apply all database schemas
docker exec -i $(LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
@echo "Applying queue schema to mysql-queue..."
@for file in extension/queue/sql/schema/*.sql; do \
@for file in extension/queue/mysql/schema/*.sql; do \
echo " - Applying $$(basename $$file)..."; \
docker exec -i $(LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
Expand Down
2 changes: 1 addition & 1 deletion doc/howto/TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ SubmitQueue uses **two separate databases** to demonstrate proper architectural

### 2. Queue Database
- **Purpose**: Messaging infrastructure (queue messages, offsets, partition leases)
- **Schema**: `extension/queue/sql/schema`
- **Schema**: `extension/queue/mysql/schema`
- **Used by**: Gateway (publishes), Orchestrator (consumes)
- **Connection**: `QUEUE_MYSQL_DSN`

Expand Down
8 changes: 4 additions & 4 deletions doc/rfc/sql-queue-rfc.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ We chose **custom database-backed queue** because:
- `(topic, partition_key, invisible_until, offset)`: Core fetch query - find visible messages in partition ordered by offset
- `(topic, partition_key, id)`: Unique constraint and fast lookup for Ack/Nack

See `extension/queue/sql/schema/queue_messages.sql` for full schema.
See `extension/queue/mysql/schema/queue_messages.sql` for full schema.

### Partition Leases Table

Expand All @@ -172,7 +172,7 @@ See `extension/queue/sql/schema/queue_messages.sql` for full schema.
- `(leased_by)`: Find all partitions owned by a worker
- `(lease_renewed_at)`: Detect stale leases across workers

See `extension/queue/sql/schema/queue_partition_leases.sql` for full schema.
See `extension/queue/mysql/schema/queue_partition_leases.sql` for full schema.

### Consumer Offsets Table

Expand All @@ -185,7 +185,7 @@ See `extension/queue/sql/schema/queue_partition_leases.sql` for full schema.
- `(consumer_group)`: Monitor all offsets for a consumer group
- `(topic)`: Find all consumers for a topic

See `extension/queue/sql/schema/queue_offsets.sql` for full schema.
See `extension/queue/mysql/schema/queue_offsets.sql` for full schema.

### Dead Letter Queue Table

Expand All @@ -201,7 +201,7 @@ See `extension/queue/sql/schema/queue_offsets.sql` for full schema.
- `(failed_at)`: Time-based queries and cleanup
- `(topic, partition_key, id)`: Unique constraint, prevents duplicates

See `extension/queue/sql/schema/queue_dlq.sql` for full schema.
See `extension/queue/mysql/schema/queue_dlq.sql` for full schema.

## Message Flow

Expand Down
2 changes: 1 addition & 1 deletion example/server/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ go_library(
deps = [
"//core/consumer",
"//extension/counter/mysql",
"//extension/queue/sql",
"//extension/queue/mysql",
"//extension/storage/mysql",
"//gateway/controller",
"//gateway/protopb",
Expand Down
8 changes: 4 additions & 4 deletions example/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
queueSQL "github.com/uber/submitqueue/extension/queue/sql"
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
"github.com/uber/submitqueue/extension/storage/mysql"
"github.com/uber/submitqueue/gateway/controller"
pb "github.com/uber/submitqueue/gateway/protopb"
Expand Down Expand Up @@ -122,15 +122,15 @@ func run() error {
defer queueDB.Close()

// Initialize queue
sqlQueue, err := queueSQL.NewQueue(queueSQL.Params{
mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{
DB: queueDB,
Logger: logger,
MetricsScope: scope.SubScope("queue"),
})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
defer sqlQueue.Close()
defer mysqlQueue.Close()

logger.Info("initialized dependencies",
zap.String("app_dsn", appDSN),
Expand All @@ -142,7 +142,7 @@ func run() error {

// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, sqlQueue.Publisher(), consumer.TopicRequest.String())
landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, mysqlQueue.Publisher(), consumer.TopicRequest.String())
gatewayServer := &GatewayServer{
pingController: pingController,
landController: landController,
Expand Down
2 changes: 1 addition & 1 deletion example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ go_library(
deps = [
"//core/consumer",
"//extension/queue",
"//extension/queue/sql",
"//extension/queue/mysql",
"//orchestrator/controller",
"//orchestrator/controller/request",
"//orchestrator/protopb",
Expand Down
10 changes: 5 additions & 5 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
extqueue "github.com/uber/submitqueue/extension/queue"
queueSQL "github.com/uber/submitqueue/extension/queue/sql"
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
"github.com/uber/submitqueue/orchestrator/controller"
"github.com/uber/submitqueue/orchestrator/controller/request"
pb "github.com/uber/submitqueue/orchestrator/protopb"
Expand Down Expand Up @@ -98,15 +98,15 @@ func run() error {
defer queueDB.Close()

// Initialize queue
sqlQueue, err := queueSQL.NewQueue(queueSQL.Params{
mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{
DB: queueDB,
Logger: logger,
MetricsScope: scope.SubScope("queue"),
})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
defer sqlQueue.Close()
defer mysqlQueue.Close()

logger.Info("initialized queue", zap.String("dsn", queueDSN))

Expand All @@ -118,8 +118,8 @@ func run() error {

registry := consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{Topic: consumer.TopicRequest, Queue: sqlQueue},
{Topic: consumer.TopicToBatch, Queue: sqlQueue},
{Topic: consumer.TopicRequest, Queue: mysqlQueue},
{Topic: consumer.TopicToBatch, Queue: mysqlQueue},
},
[]extqueue.SubscriptionConfig{
extqueue.DefaultSubscriptionConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "sql",
name = "mysql",
srcs = [
"constants.go",
"errors.go",
Expand All @@ -15,7 +15,7 @@ go_library(
"subscriber.go",
"validation.go",
],
importpath = "github.com/uber/submitqueue/extension/queue/sql",
importpath = "github.com/uber/submitqueue/extension/queue/mysql",
visibility = ["//visibility:public"],
deps = [
"//entity/queue",
Expand All @@ -27,7 +27,7 @@ go_library(
)

go_test(
name = "sql_test",
name = "mysql_test",
srcs = [
"message_store_test.go",
"offset_store_test.go",
Expand All @@ -36,7 +36,7 @@ go_test(
"sql_test.go",
"subscriber_test.go",
],
embed = [":sql"],
embed = [":mysql"],
deps = [
"//entity/queue",
"//extension/queue",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ MySQL-based distributed queue with partition leasing, visibility timeout, and at
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
queueSQL "github.com/uber/submitqueue/extension/queue/sql"
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
extqueue "github.com/uber/submitqueue/extension/queue"
"github.com/uber/submitqueue/entity/queue"
)

// Setup
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db")
q, _ := queueSQL.NewQueue(queueSQL.Params{
q, _ := queueMySQL.NewQueue(queueMySQL.Params{
DB: db,
Logger: logger,
MetricsScope: metrics,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

// Common constants for frequently repeated strings across stores

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import "fmt"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"database/sql"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"database/sql"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

//go:generate mockgen -source=stores.go -destination=mock_stores.go -package=sql

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sql
package mysql

import "fmt"

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"//:go.mod",
"//example/server:docker-compose.yml",
"//extension/counter/mysql/schema",
"//extension/queue/sql/schema",
"//extension/queue/mysql/schema",
"//extension/storage/mysql/schema",
],
tags = [
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *E2EIntegrationSuite) SetupSuite() {
testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/counter/mysql/schema"))

// Apply schemas programmatically to queue database
testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/sql/schema"))
testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/queue/mysql/schema"))

s.log.Logf("Schemas applied successfully")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
load("@rules_go//go:def.bzl", "go_test")

go_test(
name = "sql_test",
name = "mysql_test",
srcs = ["queue_test.go"],
data = [
"docker-compose.yml",
"//extension/queue/sql/schema",
"//extension/queue/mysql/schema",
],
tags = ["integration"],
deps = [
"//entity/queue",
"//extension/queue",
"//extension/queue/sql",
"//extension/queue/mysql",
"//test/testutil",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_stretchr_testify//assert",
Expand Down
Loading