diff --git a/CLAUDE.md b/CLAUDE.md index 3c92df4..a7fadcd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -8,6 +8,8 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - `make build` — `make test` + `go build ./...`. CI runs this (`.github/workflows/build.yml`). - `make test.db` — runs the `storage/mysql` package tests against a real MySQL (30s timeout, with `-race`). These hit a live DB and are excluded by `-short`. - `make test.db.local` — `docker compose -f doc/docker-compose.yml up --wait`, then `make test.db`, then `docker compose down`. Use when no MySQL is already running. +- `make test.mq` — runs the `dispatcher/rabbitmq` package tests against a real RabbitMQ (30s timeout, with `-race`). These hit a live broker and are excluded by `-short`. +- `make test.mq.local` — `docker compose -f doc/docker-compose.yml up --wait`, then `make test.mq`, then `docker compose down`. Use when no RabbitMQ is already running. - Single test: `go test -run '^TestName$' ./internal/pipeline` (add `-race -count=1` to match `make test`). - Single subtest in this codebase's `gunit` style: `go test -run '^TestFixture$/^MethodName$' ./path`. @@ -54,6 +56,14 @@ All database work flows through a single seam: `Options.Storage(...)` takes a `s - `internal/adapters` holds the thin `Writer`, `Dispatcher`, and `Recovery` types the pipeline wires from `config.Storage`. Each builds a table-agnostic `storage.*` operation and hands it to `Storage.Exec`. They are single-goroutine (one reusable op buffer each). `Dispatcher` rejects `ID == 0` before publishing (an unassigned id could never be marked and would republish forever). `Recovery` is the stateful keyset cursor (snapshot `MIN/MAX(id)` of undispatched rows on the first call, page within that frozen window, advance only after a clean page). - `storage/mysql.Mapper` implements `Storage` against the `Snapshots` and `Messages` tables in `doc/mysql/schema.sql`. It is **safe for concurrent use** (pooled statement buffers). Insert emits one multi-row INSERT per batch and derives IDs from `LAST_INSERT_ID() + i*stride` (relies on `innodb_autoinc_lock_mode = 2` "simple insert" semantics and `stride` matching the server's `auto_increment_increment`). `quoteTableName` validates table names against `^[A-Za-z0-9_]+$` and back-tick quotes them. `WithLegacyWrite(...)` is a deprecated transitional hook run inside the INSERT transaction. +### Dispatcher seam (`contracts.Dispatcher` + bundled `dispatcher/rabbitmq`) + +`Options.Dispatcher(...)` takes a `contracts.Dispatcher` (`Dispatch(ctx, ...*Message) error`). Unlike the storage seam this one is **public** — callers may supply their own — but harness now ships a batteries-included default so the common case is one line, mirroring the `storage/mysql` precedent. + +- `dispatcher/rabbitmq.NewDispatcher(address string, options ...Option)` is the bundled default. It is the dispatcher analog of `storage/mysql`: a thin, direct implementation over the `github.com/rabbitmq/amqp091-go` driver (the version messaging/v3 pins) rather than a messaging framework. Configure it via the `Options` singleton — `Options.TLS(*tls.Config)` and `Options.Dialer(ContextDialer)` — the same `var Options singleton` shape used elsewhere in the module. `NewDispatcher` promotes `?username=&password=` query credentials into the URL userinfo (where amqp091 reads them, falling back to guest/guest otherwise) so a single messaging/v3-style `message_host` works for both the consumer and the dispatcher; userinfo wins on conflict, unparseable addresses pass through (`credentials.go`). +- It owns a lazily established, long-lived transaction-mode channel: `Dispatch` ensures the channel (dial + open + `Tx()`), publishes each message as a **persistent** delivery to the exchange named by its `Type` (routing key `""`, fanout — exchanges are assumed to exist, never declared, matching messaging/v3), then `TxCommit()`s for a synchronous broker ack. It returns `nil` only after a clean commit. On **any** publish or commit failure it calls `reset()` (tears down channel + connection) and returns the error; the broadcast stage retries forever, so the next `Dispatch` reconnects. `Close()` (io.Closer, for dominoes shutdown) releases the cached channel and is a safe no-op when nothing is cached. +- Driven by a single goroutine (the broadcast stage via `adapters.Dispatcher`, which still owns ID-rejection and mark-dispatched), so the cached channel and reused `[]amqp.Publishing` buffer need no locking — same one-goroutine discipline as the internal adapters. An internal `transport`/`channel` seam (real impl in `amqp.go`) lets the unit suite run broker-free against a fake; `live_test.go` (skipped under `-short`) round-trips against a real broker via `make test.mq`. + ### Snapshots & replay (`snapshots/`, `internal/domainscan/`) `snapshots.Save` gzip+JSON-encodes a snapshot row; `snapshots.Load` loads a snapshot (latest or by id), applies it to the domain, and — only when `RegisteredEvents(...)` is supplied — replays events since the snapshot's high watermark. `internal/domainscan` centralizes the reflective `Execute`/`Apply` method-shape detection shared by the pipeline router and the replay machinery. Note: `LoadResult.NewHighWatermark` is zero when no events were replayed (`EventsAppliedCount == 0`). diff --git a/Makefile b/Makefile index e66d85b..2cdea47 100755 --- a/Makefile +++ b/Makefile @@ -10,6 +10,12 @@ test.db: test test.db.local: (docker compose -f doc/docker-compose.yml up --wait && $(MAKE) test.db --no-print-directory); docker compose -f doc/docker-compose.yml down +test.mq: test + go test -timeout=30s -race -covermode=atomic github.com/smarty/harness/v2/dispatcher/rabbitmq + +test.mq.local: + (docker compose -f doc/docker-compose.yml up --wait && $(MAKE) test.mq --no-print-directory); docker compose -f doc/docker-compose.yml down + fmt: go mod tidy && go fmt ./... @@ -18,4 +24,4 @@ compile: build: test compile -.PHONY: test test.db test.db.local fmt compile build +.PHONY: test test.db test.db.local test.mq test.mq.local fmt compile build diff --git a/dispatcher/rabbitmq/amqp.go b/dispatcher/rabbitmq/amqp.go new file mode 100644 index 0000000..db1d6aa --- /dev/null +++ b/dispatcher/rabbitmq/amqp.go @@ -0,0 +1,90 @@ +package rabbitmq + +import ( + "context" + "crypto/tls" + "errors" + "net" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// handshakeTimeout bounds the AMQP protocol handshake when the caller's context +// carries no deadline. It mirrors amqp091's DefaultDial default, which our custom +// Dial closure bypasses; the deadline is lifted by amqp091 once connected. +const handshakeTimeout = 30 * time.Second + +// ContextDialer establishes the underlying TCP connection for the AMQP dial. +// *net.Dialer satisfies it, and it is the default; callers override it through +// Options.Dialer (e.g. to honor a cancellable pipeline context during dialing). +type ContextDialer interface { + DialContext(ctx context.Context, network, address string) (net.Conn, error) +} + +// amqpTransport is the default transport: it dials a RabbitMQ broker over +// amqp091-go and hands back a transaction-mode channel. PLAIN SASL credentials +// and the vhost are taken from the address URL by amqp.DialConfig. +type amqpTransport struct { + address string + tls *tls.Config + dialer ContextDialer +} + +func (this *amqpTransport) connect(ctx context.Context) (channel, error) { + config := amqp.Config{ + TLSClientConfig: this.tls, + Dial: func(network, address string) (net.Conn, error) { + connection, err := this.dialer.DialContext(ctx, network, address) + if err != nil { + return nil, err + } + // amqp.Open is not context-aware: once the socket is up, ctx + // cancellation can't interrupt the handshake read. Arm a read + // deadline (honoring the caller's deadline, else handshakeTimeout) + // so the single broadcast goroutine stays interruptible. amqp091 + // clears it in openComplete once connected. + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(handshakeTimeout) + } + if err := connection.SetReadDeadline(deadline); err != nil { + _ = connection.Close() + return nil, err + } + return connection, nil + }, + } + connection, err := amqp.DialConfig(this.address, config) + if err != nil { + return nil, err + } + transacted, err := connection.Channel() + if err != nil { + _ = connection.Close() + return nil, err + } + if err := transacted.Tx(); err != nil { + _ = transacted.Close() + _ = connection.Close() + return nil, err + } + return &amqpChannel{connection: connection, channel: transacted}, nil +} + +// amqpChannel is a transaction-mode channel and the connection it rode in on, +// kept together so close() tears down both. +type amqpChannel struct { + connection *amqp.Connection + channel *amqp.Channel +} + +func (this *amqpChannel) publish(ctx context.Context, exchange string, msg amqp.Publishing) error { + return this.channel.PublishWithContext(ctx, exchange, "", false, false, msg) +} +func (this *amqpChannel) commit() error { + return this.channel.TxCommit() +} +func (this *amqpChannel) close() error { + return errors.Join(this.channel.Close(), this.connection.Close()) +} diff --git a/dispatcher/rabbitmq/amqp_test.go b/dispatcher/rabbitmq/amqp_test.go new file mode 100644 index 0000000..c21de06 --- /dev/null +++ b/dispatcher/rabbitmq/amqp_test.go @@ -0,0 +1,93 @@ +package rabbitmq + +import ( + "context" + "io" + "net" + "testing" + "time" + + "github.com/smarty/gunit/v2" + "github.com/smarty/gunit/v2/assert/should" +) + +func TestTransportFixture(t *testing.T) { + gunit.Run(new(TransportFixture), t) +} + +type TransportFixture struct { + *gunit.Fixture + dialer *recordingDialer + transport *amqpTransport +} + +func (this *TransportFixture) Setup() { + this.dialer = &recordingDialer{} + this.transport = &amqpTransport{ + address: "amqp://guest:guest@localhost:5672/", + dialer: this.dialer, + } +} + +func (this *TransportFixture) TestHandshakeDeadline_FromContext() { + deadline := time.Now().Add(2 * time.Minute) + ctx, cancel := context.WithDeadline(context.Background(), deadline) + defer cancel() + + // connect dials the fake conn, arms the read deadline, then fails the AMQP + // handshake (the fake conn returns io.EOF on Read) — that error is expected. + _, err := this.transport.connect(ctx) + + this.So(err, should.NOT.BeNil) + this.So(this.dialer.conn.deadlineSet, should.BeTrue) + this.So(this.dialer.conn.readDeadline, should.HappenOn, deadline) +} + +func (this *TransportFixture) TestHandshakeDeadline_DefaultWhenNoContextDeadline() { + approxExpected := time.Now().Add(handshakeTimeout) + + _, err := this.transport.connect(context.Background()) + + this.So(err, should.NOT.BeNil) + this.So(this.dialer.conn.deadlineSet, should.BeTrue) + this.So(this.dialer.conn.readDeadline, should.HappenWithin, 5*time.Second, approxExpected) +} + +// recordingDialer hands back a single recordingConn so a test can inspect the +// read deadline the transport armed before the AMQP handshake. +type recordingDialer struct { + conn *recordingConn +} + +func (this *recordingDialer) DialContext(_ context.Context, _, _ string) (net.Conn, error) { + this.conn = &recordingConn{} + return this.conn, nil +} + +// recordingConn is a net.Conn that records the read deadline set on it and fails +// the AMQP handshake fast (Read returns io.EOF) so amqp.DialConfig needs no broker. +// SetReadDeadline is only ever called by connect's Dial closure (before amqp.Open +// launches its reader), so the recorded fields are written before any concurrent +// Read and need no locking. +type recordingConn struct { + deadlineSet bool + readDeadline time.Time +} + +func (this *recordingConn) Read([]byte) (int, error) { return 0, io.EOF } +func (this *recordingConn) Write(p []byte) (int, error) { return len(p), nil } +func (this *recordingConn) Close() error { return nil } +func (this *recordingConn) LocalAddr() net.Addr { return fakeAddr{} } +func (this *recordingConn) RemoteAddr() net.Addr { return fakeAddr{} } +func (this *recordingConn) SetDeadline(time.Time) error { return nil } +func (this *recordingConn) SetReadDeadline(deadline time.Time) error { + this.deadlineSet = true + this.readDeadline = deadline + return nil +} +func (this *recordingConn) SetWriteDeadline(time.Time) error { return nil } + +type fakeAddr struct{} + +func (fakeAddr) Network() string { return "tcp" } +func (fakeAddr) String() string { return "fake" } diff --git a/dispatcher/rabbitmq/config.go b/dispatcher/rabbitmq/config.go new file mode 100644 index 0000000..c1f23ef --- /dev/null +++ b/dispatcher/rabbitmq/config.go @@ -0,0 +1,37 @@ +package rabbitmq + +import ( + "crypto/tls" + "net" +) + +// NewDispatcher builds a Dispatcher that publishes to the RabbitMQ broker at the +// given address (an amqp:// or amqps:// URL carrying credentials and vhost). It +// mirrors mysql.NewMapper's shape: a required handle/address followed by the +// repo-wide Options-singleton functional options. The package owns the connection +// lifecycle (unlike database/sql), so it takes coordinates rather than a live handle. +func NewDispatcher(address string, options ...option) *Dispatcher { + transport := &amqpTransport{address: promoteCredentials(address)} + for _, option := range append(Options.defaults(), options...) { + option(transport) + } + return &Dispatcher{transport: transport} +} + +type option func(*amqpTransport) + +var Options singleton + +type singleton struct{} + +func (singleton) TLS(config *tls.Config) option { + return func(transport *amqpTransport) { transport.tls = config } +} +func (singleton) Dialer(dialer ContextDialer) option { + return func(transport *amqpTransport) { transport.dialer = dialer } +} +func (singleton) defaults() []option { + return []option{ + Options.Dialer(new(net.Dialer)), + } +} diff --git a/dispatcher/rabbitmq/config_test.go b/dispatcher/rabbitmq/config_test.go new file mode 100644 index 0000000..6ea54a7 --- /dev/null +++ b/dispatcher/rabbitmq/config_test.go @@ -0,0 +1,55 @@ +package rabbitmq + +import ( + "crypto/tls" + "net" + "testing" + + "github.com/smarty/gunit/v2" + "github.com/smarty/gunit/v2/assert/should" +) + +func TestConfigFixture(t *testing.T) { + gunit.Run(new(ConfigFixture), t) +} + +type ConfigFixture struct { + *gunit.Fixture +} + +func (this *ConfigFixture) transportOf(dispatcher *Dispatcher) *amqpTransport { + result, ok := dispatcher.transport.(*amqpTransport) + this.So(ok, should.BeTrue) + return result +} + +func (this *ConfigFixture) TestDefaults_AddressSetTLSNilDefaultDialer() { + transport := this.transportOf(NewDispatcher("amqp://localhost/vhost")) + + this.So(transport.address, should.Equal, "amqp://localhost/vhost") + this.So(transport.tls, should.BeNil) + _, isNetDialer := transport.dialer.(*net.Dialer) + this.So(isNetDialer, should.BeTrue) +} + +func (this *ConfigFixture) TestTLSOption_SetsTLSConfig() { + config := &tls.Config{ServerName: "broker"} + + transport := this.transportOf(NewDispatcher("amqps://localhost", Options.TLS(config))) + + this.So(transport.tls == config, should.BeTrue) +} + +func (this *ConfigFixture) TestDialerOption_SetsDialer() { + dialer := &net.Dialer{} + + transport := this.transportOf(NewDispatcher("amqp://localhost", Options.Dialer(dialer))) + + this.So(transport.dialer == dialer, should.BeTrue) +} + +func (this *ConfigFixture) TestAddress_PromotesQueryCredentialsToUserinfo() { + transport := this.transportOf(NewDispatcher("amqp://host/?username=bob&password=secret")) + + this.So(transport.address, should.Equal, "amqp://bob:secret@host/") +} diff --git a/dispatcher/rabbitmq/credentials.go b/dispatcher/rabbitmq/credentials.go new file mode 100644 index 0000000..e2ab110 --- /dev/null +++ b/dispatcher/rabbitmq/credentials.go @@ -0,0 +1,37 @@ +package rabbitmq + +import ( + "cmp" + "net/url" +) + +// promoteCredentials returns address with credentials promoted into the URL +// userinfo, where amqp091-go expects them. The smarty messaging/v3 connector +// also honors credentials supplied as ?username=&password= query parameters, but +// amqp091 reads only the userinfo and otherwise falls back to guest/guest — +// yielding a 403. Promoting them here lets a single message_host config work for +// both the consumer and the dispatcher. Userinfo takes precedence over query +// parameters, matching messaging/v3; an unparseable address is returned as-is. +func promoteCredentials(address string) string { + endpoint, err := url.Parse(address) + if err != nil || endpoint == nil { + return address + } + query := endpoint.Query() + username := query.Get("username") + password := query.Get("password") + if username == "" && password == "" { + return address + } + if endpoint.User != nil { + username = cmp.Or(endpoint.User.Username(), username) + if existing, ok := endpoint.User.Password(); ok { + password = cmp.Or(existing, password) + } + } + endpoint.User = url.UserPassword(username, password) + query.Del("username") + query.Del("password") + endpoint.RawQuery = query.Encode() + return endpoint.String() +} diff --git a/dispatcher/rabbitmq/credentials_test.go b/dispatcher/rabbitmq/credentials_test.go new file mode 100644 index 0000000..958f9a9 --- /dev/null +++ b/dispatcher/rabbitmq/credentials_test.go @@ -0,0 +1,66 @@ +package rabbitmq + +import ( + "testing" + + "github.com/smarty/gunit/v2" + "github.com/smarty/gunit/v2/assert/should" +) + +func TestCredentialsFixture(t *testing.T) { + gunit.Run(new(CredentialsFixture), t) +} + +type CredentialsFixture struct { + *gunit.Fixture +} + +func (this *CredentialsFixture) assertPromoted(input, expected string) { + this.So(promoteCredentials(input), should.Equal, expected) +} + +func (this *CredentialsFixture) TestPromotesQueryCredentialsToUserinfo() { + this.assertPromoted( + "amqp://rabbit.service.consul/?username=bob&password=secret", + "amqp://bob:secret@rabbit.service.consul/", + ) +} +func (this *CredentialsFixture) TestPromotesQueryCredentialsKeepingOtherParams() { + this.assertPromoted( + "amqp://rabbit.service.consul/?username=bob&password=secret&server-name=rabbit", + "amqp://bob:secret@rabbit.service.consul/?server-name=rabbit", + ) +} +func (this *CredentialsFixture) TestPercentEncodesCredentials() { + this.assertPromoted( + "amqp://rabbit.service.consul/?username=bob&password=p@ss/word", + "amqp://bob:p%40ss%2Fword@rabbit.service.consul/", + ) +} +func (this *CredentialsFixture) TestUserinfoTakesPrecedenceOverQuery() { + this.assertPromoted( + "amqp://alice:realpass@rabbit.service.consul/?username=bob&password=secret", + "amqp://alice:realpass@rabbit.service.consul/", + ) +} +func (this *CredentialsFixture) TestFillsMissingFieldFromQuery() { + this.assertPromoted( + "amqp://alice@rabbit.service.consul/?username=bob&password=secret", + "amqp://alice:secret@rabbit.service.consul/", + ) +} +func (this *CredentialsFixture) TestUnchangedWhenNoQueryCredentials() { + this.assertPromoted( + "amqp://alice:realpass@rabbit.service.consul/", + "amqp://alice:realpass@rabbit.service.consul/", + ) +} +func (this *CredentialsFixture) TestUnchangedWhenNoCredentialsAtAll() { + this.assertPromoted( + "amqp://rabbit.service.consul/", + "amqp://rabbit.service.consul/", + ) +} +func (this *CredentialsFixture) TestUnchangedWhenUnparseable() { + this.assertPromoted("://nonsense", "://nonsense") +} diff --git a/dispatcher/rabbitmq/dispatcher.go b/dispatcher/rabbitmq/dispatcher.go new file mode 100644 index 0000000..6516eed --- /dev/null +++ b/dispatcher/rabbitmq/dispatcher.go @@ -0,0 +1,105 @@ +// Package rabbitmq provides the bundled default contracts.Dispatcher for +// harness, publishing messages to RabbitMQ over AMQP. It is the dispatcher +// analog of storage/mysql: a thin, direct implementation over the +// github.com/rabbitmq/amqp091-go driver rather than a messaging framework. +// +// NewDispatcher promotes ?username=&password= query credentials into the URL +// userinfo (see credentials.go), where amqp091 reads them, so a single +// messaging/v3-style address works for both the consumer and the dispatcher. +package rabbitmq + +import ( + "context" + + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/smarty/harness/v2/contracts" +) + +// Dispatcher publishes batches of messages to RabbitMQ. It owns a lazily +// established, long-lived transaction-mode channel and reconnects on the next +// Dispatch after any failure. The broadcast stage drives it from a single +// goroutine, so the cached channel and reused buffer need no locking. +type Dispatcher struct { + transport transport + channel channel + buffer []amqp.Publishing +} + +// Dispatch publishes every message as a persistent delivery to the exchange +// named by its Type, then commits the transaction. It returns nil only after a +// clean commit (a durable broker ack); on any publish or commit failure it +// resets the connection and returns the error, so the next call reconnects. +func (this *Dispatcher) Dispatch(ctx context.Context, messages ...*contracts.Message) error { + if len(messages) == 0 { + return nil + } + current, err := this.ensureChannel(ctx) + if err != nil { + return err + } + clear(this.buffer) + this.buffer = this.buffer[:0] + for _, message := range messages { + this.buffer = append(this.buffer, toPublishing(message)) + } + for i, publishing := range this.buffer { + if err := current.publish(ctx, messages[i].Type, publishing); err != nil { + this.reset() + return err + } + } + if err := current.commit(); err != nil { + this.reset() + return err + } + return nil +} + +// Close releases the cached channel/connection. It is an io.Closer for +// dominoes-managed shutdown and is a safe no-op when nothing is cached. +func (this *Dispatcher) Close() error { + if this.channel == nil { + return nil + } + err := this.channel.close() + this.channel = nil + return err +} + +func (this *Dispatcher) ensureChannel(ctx context.Context) (channel, error) { + if this.channel != nil { + return this.channel, nil + } + current, err := this.transport.connect(ctx) + if err != nil { + return nil, err + } + this.channel = current + return current, nil +} + +func (this *Dispatcher) reset() { + if this.channel == nil { + return + } + _ = this.channel.close() + this.channel = nil +} + +// transport is the seam the Dispatcher depends on for connectivity. The default +// implementation (amqp.go) wraps amqp091-go; tests substitute a fake so the unit +// suite needs no live broker. +type transport interface { + // connect dials the broker, opens a connection and channel, and puts the + // channel into transaction mode (Tx), returning it ready to publish. + connect(ctx context.Context) (channel, error) +} + +// channel is a transaction-mode AMQP channel the Dispatcher publishes a batch +// through and then commits. +type channel interface { + publish(ctx context.Context, exchange string, msg amqp.Publishing) error + commit() error + close() error +} diff --git a/dispatcher/rabbitmq/dispatcher_test.go b/dispatcher/rabbitmq/dispatcher_test.go new file mode 100644 index 0000000..f3e40e9 --- /dev/null +++ b/dispatcher/rabbitmq/dispatcher_test.go @@ -0,0 +1,181 @@ +package rabbitmq + +import ( + "bytes" + "context" + "errors" + "testing" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/smarty/gunit/v2" + "github.com/smarty/gunit/v2/assert/should" + "github.com/smarty/harness/v2/contracts" +) + +func TestDispatcherFixture(t *testing.T) { + gunit.Run(new(DispatcherFixture), t) +} + +type DispatcherFixture struct { + *gunit.Fixture + ctx context.Context + transport *fakeTransport + subject *Dispatcher +} + +func (this *DispatcherFixture) Setup() { + this.ctx = context.Background() + this.transport = &fakeTransport{} + this.subject = &Dispatcher{transport: this.transport} +} + +func message(typeName, body string) *contracts.Message { + return &contracts.Message{ + ID: 1, + Type: typeName, + ContentType: "application/json", + Content: bytes.NewBufferString(body), + } +} + +func (this *DispatcherFixture) TestWritesThenCommits() { + err := this.subject.Dispatch(this.ctx, + message("order-received", "a"), + message("order-approved", "b"), + ) + + this.So(err, should.BeNil) + this.So(this.transport.connects, should.Equal, 1) + channel := this.transport.opened[0] + this.So(channel.calls, should.Equal, []string{"publish", "publish", "commit"}) + this.So(channel.published[0].exchange, should.Equal, "order-received") + this.So(channel.published[0].msg.Body, should.Equal, []byte("a")) + this.So(channel.published[0].msg.DeliveryMode, should.Equal, uint8(amqp.Persistent)) + this.So(channel.published[1].exchange, should.Equal, "order-approved") + this.So(channel.published[1].msg.Body, should.Equal, []byte("b")) +} + +func (this *DispatcherFixture) TestEmptyBatch_NoConnectNoCommit() { + err := this.subject.Dispatch(this.ctx) + + this.So(err, should.BeNil) + this.So(this.transport.connects, should.Equal, 0) + this.So(len(this.transport.opened), should.Equal, 0) +} + +func (this *DispatcherFixture) TestReusesChannelAcrossDispatches() { + this.So(this.subject.Dispatch(this.ctx, message("a", "1")), should.BeNil) + this.So(this.subject.Dispatch(this.ctx, message("b", "2")), should.BeNil) + + this.So(this.transport.connects, should.Equal, 1) + this.So(this.transport.opened[0].calls, should.Equal, []string{"publish", "commit", "publish", "commit"}) +} + +func (this *DispatcherFixture) TestPublishError_ResetsAndReconnectsNextDispatch() { + boom := errors.New("publish boom") + this.transport.channels = []*fakeChannel{{publishErr: boom}} + + err := this.subject.Dispatch(this.ctx, message("a", "1")) + + this.So(err, should.WrapError, boom) + this.So(this.transport.opened[0].closes, should.Equal, 1) + + this.So(this.subject.Dispatch(this.ctx, message("a", "2")), should.BeNil) + this.So(this.transport.connects, should.Equal, 2) + this.So(this.transport.opened[1].calls, should.Equal, []string{"publish", "commit"}) +} + +func (this *DispatcherFixture) TestCommitError_ResetsAndReconnectsNextDispatch() { + boom := errors.New("commit boom") + this.transport.channels = []*fakeChannel{{commitErr: boom}} + + err := this.subject.Dispatch(this.ctx, message("a", "1")) + + this.So(err, should.WrapError, boom) + this.So(this.transport.opened[0].closes, should.Equal, 1) + + this.So(this.subject.Dispatch(this.ctx, message("a", "2")), should.BeNil) + this.So(this.transport.connects, should.Equal, 2) +} + +func (this *DispatcherFixture) TestConnectError_ReturnedAndNothingCached() { + boom := errors.New("connect boom") + this.transport.connectErr = boom + + err := this.subject.Dispatch(this.ctx, message("a", "1")) + + this.So(err, should.WrapError, boom) + this.So(this.transport.connects, should.Equal, 1) + + this.transport.connectErr = nil + this.So(this.subject.Dispatch(this.ctx, message("a", "2")), should.BeNil) + this.So(this.transport.connects, should.Equal, 2) +} + +func (this *DispatcherFixture) TestClose_ReleasesCachedChannel() { + this.So(this.subject.Dispatch(this.ctx, message("a", "1")), should.BeNil) + + this.So(this.subject.Close(), should.BeNil) + this.So(this.transport.opened[0].closes, should.Equal, 1) +} + +func (this *DispatcherFixture) TestClose_WithoutPriorDispatch_IsSafeNoOp() { + this.So(this.subject.Close(), should.BeNil) + this.So(this.transport.connects, should.Equal, 0) +} + +// fakeTransport stands in for amqpTransport so the unit suite needs no broker. +// It hands out one channel per connect() call: the channels enqueued in +// `channels` are returned first (to inject failing channels), then healthy ones. +// Every channel handed out is retained in `opened` so a test can assert it was +// later closed by reset()/Close(). +type fakeTransport struct { + channels []*fakeChannel + opened []*fakeChannel + connectErr error + connects int +} + +func (this *fakeTransport) connect(ctx context.Context) (channel, error) { + this.connects++ + if this.connectErr != nil { + return nil, this.connectErr + } + var next *fakeChannel + if len(this.channels) > 0 { + next, this.channels = this.channels[0], this.channels[1:] + } else { + next = &fakeChannel{} + } + this.opened = append(this.opened, next) + return next, nil +} + +// fakeChannel records the order of publish/commit calls, captures every +// published message, exposes injectable publish/commit errors, and counts closes. +type fakeChannel struct { + calls []string + published []publishedMessage + publishErr error + commitErr error + closes int +} + +type publishedMessage struct { + exchange string + msg amqp.Publishing +} + +func (this *fakeChannel) publish(ctx context.Context, exchange string, msg amqp.Publishing) error { + this.calls = append(this.calls, "publish") + this.published = append(this.published, publishedMessage{exchange: exchange, msg: msg}) + return this.publishErr +} +func (this *fakeChannel) commit() error { + this.calls = append(this.calls, "commit") + return this.commitErr +} +func (this *fakeChannel) close() error { + this.closes++ + return nil +} diff --git a/dispatcher/rabbitmq/live_test.go b/dispatcher/rabbitmq/live_test.go new file mode 100644 index 0000000..93f1184 --- /dev/null +++ b/dispatcher/rabbitmq/live_test.go @@ -0,0 +1,100 @@ +package rabbitmq + +import ( + "bytes" + "context" + "testing" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/smarty/gunit/v2" + "github.com/smarty/gunit/v2/assert/should" + "github.com/smarty/harness/v2/contracts" +) + +// Integration tests in this package require a local RabbitMQ broker +// (see doc/docker-compose.yml). They are excluded by `-short`. +const liveAddress = "amqp://guest:guest@127.0.0.1:5672/" + +func TestLiveDispatcherFixture(t *testing.T) { + if testing.Short() { + t.Skip("Skipping long-running broker integration tests.") + } + ensureBrokerReadiness(t) + gunit.Run(new(LiveDispatcherFixture), t, gunit.Options.IntegrationTests()) +} + +func ensureBrokerReadiness(t *testing.T) { + connection, err := amqp.Dial(liveAddress) + if err != nil { + t.Fatal("Broker not available (is rabbitmq running?):", err) + } + _ = connection.Close() +} + +type LiveDispatcherFixture struct { + *gunit.Fixture + ctx context.Context + control *amqp.Connection + channel *amqp.Channel + exchange string + queue string + subject *Dispatcher +} + +func (this *LiveDispatcherFixture) Setup() { + this.ctx = context.Background() + this.exchange = "harness-live-test-exchange" + + connection, err := amqp.Dial(liveAddress) + this.So(err, should.BeNil) + this.control = connection + channel, err := connection.Channel() + this.So(err, should.BeNil) + this.channel = channel + + // Declare the fanout exchange the dispatcher will publish to, plus an + // exclusive server-named queue bound to it to capture deliveries. + err = channel.ExchangeDeclare(this.exchange, "fanout", true, false, false, false, nil) + this.So(err, should.BeNil) + queue, err := channel.QueueDeclare("", true, false, true, false, nil) + this.So(err, should.BeNil) + this.queue = queue.Name + err = channel.QueueBind(this.queue, "", this.exchange, false, nil) + this.So(err, should.BeNil) + + this.subject = NewDispatcher(liveAddress) +} + +func (this *LiveDispatcherFixture) Teardown() { + _ = this.subject.Close() + _, _ = this.channel.QueueDelete(this.queue, false, false, false) + _ = this.channel.ExchangeDelete(this.exchange, false, false) + _ = this.channel.Close() + _ = this.control.Close() +} + +func (this *LiveDispatcherFixture) TestPublishRoundTrip() { + body := `{"event":"renewed"}` + + err := this.subject.Dispatch(this.ctx, &contracts.Message{ + ID: 1, + Type: this.exchange, + ContentType: "application/json", + Content: bytes.NewBufferString(body), + }) + + this.So(err, should.BeNil) + + deliveries, err := this.channel.Consume(this.queue, "", true, false, false, false, nil) + this.So(err, should.BeNil) + select { + case delivery := <-deliveries: + this.So(string(delivery.Body), should.Equal, body) + this.So(delivery.Type, should.Equal, this.exchange) + this.So(delivery.ContentType, should.Equal, "application/json") + this.So(delivery.DeliveryMode, should.Equal, uint8(amqp.Persistent)) + case <-time.After(3 * time.Second): + this.So(false, should.BeTrue) // timed out waiting for the published delivery + } +} diff --git a/dispatcher/rabbitmq/publishing.go b/dispatcher/rabbitmq/publishing.go new file mode 100644 index 0000000..6f742a8 --- /dev/null +++ b/dispatcher/rabbitmq/publishing.go @@ -0,0 +1,30 @@ +package rabbitmq + +import ( + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/smarty/harness/v2/contracts" +) + +// toPublishing maps a *contracts.Message to a persistent amqp.Publishing, +// ported from messaging/v3's publish mapping and narrowed to the fields harness +// populates. The message is published to the exchange named by its Type (see +// Dispatcher.Dispatch); the Type field is carried on the publishing as well. +// amqp091 copies Body into the wire frame during PublishWithContext, so the +// pooled Content buffer is never retained past the publish call. +// +// Content is normally populated by the serialization stage before broadcast; a +// nil Content (only reachable by direct misuse of the public type) maps to a nil +// Body rather than panicking, which amqp091 treats as a valid empty payload. +func toPublishing(message *contracts.Message) amqp.Publishing { + var body []byte + if message.Content != nil { + body = message.Content.Bytes() + } + return amqp.Publishing{ + Type: message.Type, + ContentType: message.ContentType, + Body: body, + DeliveryMode: amqp.Persistent, + } +} diff --git a/dispatcher/rabbitmq/publishing_test.go b/dispatcher/rabbitmq/publishing_test.go new file mode 100644 index 0000000..400fd5f --- /dev/null +++ b/dispatcher/rabbitmq/publishing_test.go @@ -0,0 +1,49 @@ +package rabbitmq + +import ( + "bytes" + "testing" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/smarty/gunit/v2" + "github.com/smarty/gunit/v2/assert/should" + "github.com/smarty/harness/v2/contracts" +) + +func TestPublishingFixture(t *testing.T) { + gunit.Run(new(PublishingFixture), t) +} + +type PublishingFixture struct { + *gunit.Fixture +} + +func (this *PublishingFixture) TestMessageMapsToPersistentPublishing() { + message := &contracts.Message{ + Type: "subscription:renewed-v2", + ContentType: "application/json; charset=utf-8", + Content: bytes.NewBufferString(`{"hello":"world"}`), + } + + result := toPublishing(message) + + this.So(result.Type, should.Equal, message.Type) + this.So(result.ContentType, should.Equal, message.ContentType) + this.So(result.Body, should.Equal, message.Content.Bytes()) + this.So(result.DeliveryMode, should.Equal, uint8(amqp.Persistent)) +} + +func (this *PublishingFixture) TestNilContentMapsToNilBody() { + message := &contracts.Message{ + Type: "subscription:renewed-v2", + ContentType: "application/json", + Content: nil, + } + + result := toPublishing(message) + + this.So(result.Body, should.BeNil) + this.So(result.Type, should.Equal, message.Type) + this.So(result.ContentType, should.Equal, message.ContentType) + this.So(result.DeliveryMode, should.Equal, uint8(amqp.Persistent)) +} diff --git a/doc/docker-compose.yml b/doc/docker-compose.yml index 3c6e52f..e4feb87 100644 --- a/doc/docker-compose.yml +++ b/doc/docker-compose.yml @@ -11,3 +11,13 @@ services: interval: 2s timeout: 20s retries: 10 + rabbitmq: + image: rabbitmq:3-management + ports: + - 5672:5672 + - 15672:15672 + healthcheck: + test: "rabbitmq-diagnostics -q check_running" + interval: 2s + timeout: 20s + retries: 10 diff --git a/go.mod b/go.mod index 5a03d16..d0d4aaf 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.26 require ( github.com/go-sql-driver/mysql v1.10.0 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/smarty/gunit/v2 v2.1.1 ) diff --git a/go.sum b/go.sum index 268e770..e3fc85e 100644 --- a/go.sum +++ b/go.sum @@ -2,5 +2,9 @@ filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo= filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc= github.com/go-sql-driver/mysql v1.10.0 h1:Q+1LV8DkHJvSYAdR83XzuhDaTykuDx0l6fkXxoWCWfw= github.com/go-sql-driver/mysql v1.10.0/go.mod h1:M+cqaI7+xxXGG9swrdeUIoPG3Y3KCkF0pZej+SK+nWk= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/smarty/gunit/v2 v2.1.1 h1:LY8XaFV2JcFfHnYZcdkt6ug9dJg4NrxwvkvuuBgV+RM= github.com/smarty/gunit/v2 v2.1.1/go.mod h1:8zgLMlQPLDFO5SVz7TrnUS1ADFiHHMChB8zPIQ4R4Ys= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=