Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
- `make build` — `make test` + `go build ./...`. CI runs this (`.github/workflows/build.yml`).
- `make test.db` — runs the `storage/mysql` package tests against a real MySQL (30s timeout, with `-race`). These hit a live DB and are excluded by `-short`.
- `make test.db.local` — `docker compose -f doc/docker-compose.yml up --wait`, then `make test.db`, then `docker compose down`. Use when no MySQL is already running.
- `make test.mq` — runs the `dispatcher/rabbitmq` package tests against a real RabbitMQ (30s timeout, with `-race`). These hit a live broker and are excluded by `-short`.
- `make test.mq.local` — `docker compose -f doc/docker-compose.yml up --wait`, then `make test.mq`, then `docker compose down`. Use when no RabbitMQ is already running.
- Single test: `go test -run '^TestName$' ./internal/pipeline` (add `-race -count=1` to match `make test`).
- Single subtest in this codebase's `gunit` style: `go test -run '^TestFixture$/^MethodName$' ./path`.

Expand Down Expand Up @@ -54,6 +56,14 @@ All database work flows through a single seam: `Options.Storage(...)` takes a `s
- `internal/adapters` holds the thin `Writer`, `Dispatcher`, and `Recovery` types the pipeline wires from `config.Storage`. Each builds a table-agnostic `storage.*` operation and hands it to `Storage.Exec`. They are single-goroutine (one reusable op buffer each). `Dispatcher` rejects `ID == 0` before publishing (an unassigned id could never be marked and would republish forever). `Recovery` is the stateful keyset cursor (snapshot `MIN/MAX(id)` of undispatched rows on the first call, page within that frozen window, advance only after a clean page).
- `storage/mysql.Mapper` implements `Storage` against the `Snapshots` and `Messages` tables in `doc/mysql/schema.sql`. It is **safe for concurrent use** (pooled statement buffers). Insert emits one multi-row INSERT per batch and derives IDs from `LAST_INSERT_ID() + i*stride` (relies on `innodb_autoinc_lock_mode = 2` "simple insert" semantics and `stride` matching the server's `auto_increment_increment`). `quoteTableName` validates table names against `^[A-Za-z0-9_]+$` and back-tick quotes them. `WithLegacyWrite(...)` is a deprecated transitional hook run inside the INSERT transaction.

### Dispatcher seam (`contracts.Dispatcher` + bundled `dispatcher/rabbitmq`)

`Options.Dispatcher(...)` takes a `contracts.Dispatcher` (`Dispatch(ctx, ...*Message) error`). Unlike the storage seam this one is **public** — callers may supply their own — but harness now ships a batteries-included default so the common case is one line, mirroring the `storage/mysql` precedent.

- `dispatcher/rabbitmq.NewDispatcher(address string, options ...Option)` is the bundled default. It is the dispatcher analog of `storage/mysql`: a thin, direct implementation over the `github.com/rabbitmq/amqp091-go` driver (the version messaging/v3 pins) rather than a messaging framework. Configure it via the `Options` singleton — `Options.TLS(*tls.Config)` and `Options.Dialer(ContextDialer)` — the same `var Options singleton` shape used elsewhere in the module. `NewDispatcher` promotes `?username=&password=` query credentials into the URL userinfo (where amqp091 reads them, falling back to guest/guest otherwise) so a single messaging/v3-style `message_host` works for both the consumer and the dispatcher; userinfo wins on conflict, unparseable addresses pass through (`credentials.go`).
- It owns a lazily established, long-lived transaction-mode channel: `Dispatch` ensures the channel (dial + open + `Tx()`), publishes each message as a **persistent** delivery to the exchange named by its `Type` (routing key `""`, fanout — exchanges are assumed to exist, never declared, matching messaging/v3), then `TxCommit()`s for a synchronous broker ack. It returns `nil` only after a clean commit. On **any** publish or commit failure it calls `reset()` (tears down channel + connection) and returns the error; the broadcast stage retries forever, so the next `Dispatch` reconnects. `Close()` (io.Closer, for dominoes shutdown) releases the cached channel and is a safe no-op when nothing is cached.
- Driven by a single goroutine (the broadcast stage via `adapters.Dispatcher`, which still owns ID-rejection and mark-dispatched), so the cached channel and reused `[]amqp.Publishing` buffer need no locking — same one-goroutine discipline as the internal adapters. An internal `transport`/`channel` seam (real impl in `amqp.go`) lets the unit suite run broker-free against a fake; `live_test.go` (skipped under `-short`) round-trips against a real broker via `make test.mq`.

### Snapshots & replay (`snapshots/`, `internal/domainscan/`)

`snapshots.Save` gzip+JSON-encodes a snapshot row; `snapshots.Load` loads a snapshot (latest or by id), applies it to the domain, and — only when `RegisteredEvents(...)` is supplied — replays events since the snapshot's high watermark. `internal/domainscan` centralizes the reflective `Execute<Foo>`/`Apply<Foo>` method-shape detection shared by the pipeline router and the replay machinery. Note: `LoadResult.NewHighWatermark` is zero when no events were replayed (`EventsAppliedCount == 0`).
Expand Down
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ test.db: test
test.db.local:
(docker compose -f doc/docker-compose.yml up --wait && $(MAKE) test.db --no-print-directory); docker compose -f doc/docker-compose.yml down

test.mq: test
go test -timeout=30s -race -covermode=atomic github.com/smarty/harness/v2/dispatcher/rabbitmq

test.mq.local:
(docker compose -f doc/docker-compose.yml up --wait && $(MAKE) test.mq --no-print-directory); docker compose -f doc/docker-compose.yml down

fmt:
go mod tidy && go fmt ./...

Expand All @@ -18,4 +24,4 @@ compile:

build: test compile

.PHONY: test test.db test.db.local fmt compile build
.PHONY: test test.db test.db.local test.mq test.mq.local fmt compile build
90 changes: 90 additions & 0 deletions dispatcher/rabbitmq/amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package rabbitmq

import (
"context"
"crypto/tls"
"errors"
"net"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

// handshakeTimeout bounds the AMQP protocol handshake when the caller's context
// carries no deadline. It mirrors amqp091's DefaultDial default, which our custom
// Dial closure bypasses; the deadline is lifted by amqp091 once connected.
const handshakeTimeout = 30 * time.Second

// ContextDialer establishes the underlying TCP connection for the AMQP dial.
// *net.Dialer satisfies it, and it is the default; callers override it through
// Options.Dialer (e.g. to honor a cancellable pipeline context during dialing).
type ContextDialer interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error)
}

// amqpTransport is the default transport: it dials a RabbitMQ broker over
// amqp091-go and hands back a transaction-mode channel. PLAIN SASL credentials
// and the vhost are taken from the address URL by amqp.DialConfig.
type amqpTransport struct {
address string
tls *tls.Config
dialer ContextDialer
}

func (this *amqpTransport) connect(ctx context.Context) (channel, error) {
config := amqp.Config{
TLSClientConfig: this.tls,
Dial: func(network, address string) (net.Conn, error) {
connection, err := this.dialer.DialContext(ctx, network, address)
if err != nil {
return nil, err
}
// amqp.Open is not context-aware: once the socket is up, ctx
// cancellation can't interrupt the handshake read. Arm a read
// deadline (honoring the caller's deadline, else handshakeTimeout)
// so the single broadcast goroutine stays interruptible. amqp091
// clears it in openComplete once connected.
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(handshakeTimeout)
}
if err := connection.SetReadDeadline(deadline); err != nil {
_ = connection.Close()
return nil, err
}
return connection, nil
},
}
connection, err := amqp.DialConfig(this.address, config)
if err != nil {
return nil, err
}
transacted, err := connection.Channel()
if err != nil {
_ = connection.Close()
return nil, err
}
if err := transacted.Tx(); err != nil {
_ = transacted.Close()
_ = connection.Close()
return nil, err
}
return &amqpChannel{connection: connection, channel: transacted}, nil
}

// amqpChannel is a transaction-mode channel and the connection it rode in on,
// kept together so close() tears down both.
type amqpChannel struct {
connection *amqp.Connection
channel *amqp.Channel
}

func (this *amqpChannel) publish(ctx context.Context, exchange string, msg amqp.Publishing) error {
return this.channel.PublishWithContext(ctx, exchange, "", false, false, msg)
}
func (this *amqpChannel) commit() error {
return this.channel.TxCommit()
}
func (this *amqpChannel) close() error {
return errors.Join(this.channel.Close(), this.connection.Close())
}
93 changes: 93 additions & 0 deletions dispatcher/rabbitmq/amqp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package rabbitmq

import (
"context"
"io"
"net"
"testing"
"time"

"github.com/smarty/gunit/v2"
"github.com/smarty/gunit/v2/assert/should"
)

func TestTransportFixture(t *testing.T) {
gunit.Run(new(TransportFixture), t)
}

type TransportFixture struct {
*gunit.Fixture
dialer *recordingDialer
transport *amqpTransport
}

func (this *TransportFixture) Setup() {
this.dialer = &recordingDialer{}
this.transport = &amqpTransport{
address: "amqp://guest:guest@localhost:5672/",
dialer: this.dialer,
}
}

func (this *TransportFixture) TestHandshakeDeadline_FromContext() {
deadline := time.Now().Add(2 * time.Minute)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

// connect dials the fake conn, arms the read deadline, then fails the AMQP
// handshake (the fake conn returns io.EOF on Read) — that error is expected.
_, err := this.transport.connect(ctx)

this.So(err, should.NOT.BeNil)
this.So(this.dialer.conn.deadlineSet, should.BeTrue)
this.So(this.dialer.conn.readDeadline, should.HappenOn, deadline)
}

func (this *TransportFixture) TestHandshakeDeadline_DefaultWhenNoContextDeadline() {
approxExpected := time.Now().Add(handshakeTimeout)

_, err := this.transport.connect(context.Background())

this.So(err, should.NOT.BeNil)
this.So(this.dialer.conn.deadlineSet, should.BeTrue)
this.So(this.dialer.conn.readDeadline, should.HappenWithin, 5*time.Second, approxExpected)
}

// recordingDialer hands back a single recordingConn so a test can inspect the
// read deadline the transport armed before the AMQP handshake.
type recordingDialer struct {
conn *recordingConn
}

func (this *recordingDialer) DialContext(_ context.Context, _, _ string) (net.Conn, error) {
this.conn = &recordingConn{}
return this.conn, nil
}

// recordingConn is a net.Conn that records the read deadline set on it and fails
// the AMQP handshake fast (Read returns io.EOF) so amqp.DialConfig needs no broker.
// SetReadDeadline is only ever called by connect's Dial closure (before amqp.Open
// launches its reader), so the recorded fields are written before any concurrent
// Read and need no locking.
type recordingConn struct {
deadlineSet bool
readDeadline time.Time
}

func (this *recordingConn) Read([]byte) (int, error) { return 0, io.EOF }
func (this *recordingConn) Write(p []byte) (int, error) { return len(p), nil }
func (this *recordingConn) Close() error { return nil }
func (this *recordingConn) LocalAddr() net.Addr { return fakeAddr{} }
func (this *recordingConn) RemoteAddr() net.Addr { return fakeAddr{} }
func (this *recordingConn) SetDeadline(time.Time) error { return nil }
func (this *recordingConn) SetReadDeadline(deadline time.Time) error {
this.deadlineSet = true
this.readDeadline = deadline
return nil
}
func (this *recordingConn) SetWriteDeadline(time.Time) error { return nil }

type fakeAddr struct{}

func (fakeAddr) Network() string { return "tcp" }
func (fakeAddr) String() string { return "fake" }
37 changes: 37 additions & 0 deletions dispatcher/rabbitmq/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package rabbitmq

import (
"crypto/tls"
"net"
)

// NewDispatcher builds a Dispatcher that publishes to the RabbitMQ broker at the
// given address (an amqp:// or amqps:// URL carrying credentials and vhost). It
// mirrors mysql.NewMapper's shape: a required handle/address followed by the
// repo-wide Options-singleton functional options. The package owns the connection
// lifecycle (unlike database/sql), so it takes coordinates rather than a live handle.
func NewDispatcher(address string, options ...option) *Dispatcher {
transport := &amqpTransport{address: promoteCredentials(address)}
for _, option := range append(Options.defaults(), options...) {
option(transport)
}
return &Dispatcher{transport: transport}
}

type option func(*amqpTransport)

var Options singleton

type singleton struct{}

func (singleton) TLS(config *tls.Config) option {
return func(transport *amqpTransport) { transport.tls = config }
}
func (singleton) Dialer(dialer ContextDialer) option {
return func(transport *amqpTransport) { transport.dialer = dialer }
}
func (singleton) defaults() []option {
return []option{
Options.Dialer(new(net.Dialer)),
}
}
55 changes: 55 additions & 0 deletions dispatcher/rabbitmq/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package rabbitmq

import (
"crypto/tls"
"net"
"testing"

"github.com/smarty/gunit/v2"
"github.com/smarty/gunit/v2/assert/should"
)

func TestConfigFixture(t *testing.T) {
gunit.Run(new(ConfigFixture), t)
}

type ConfigFixture struct {
*gunit.Fixture
}

func (this *ConfigFixture) transportOf(dispatcher *Dispatcher) *amqpTransport {
result, ok := dispatcher.transport.(*amqpTransport)
this.So(ok, should.BeTrue)
return result
}

func (this *ConfigFixture) TestDefaults_AddressSetTLSNilDefaultDialer() {
transport := this.transportOf(NewDispatcher("amqp://localhost/vhost"))

this.So(transport.address, should.Equal, "amqp://localhost/vhost")
this.So(transport.tls, should.BeNil)
_, isNetDialer := transport.dialer.(*net.Dialer)
this.So(isNetDialer, should.BeTrue)
}

func (this *ConfigFixture) TestTLSOption_SetsTLSConfig() {
config := &tls.Config{ServerName: "broker"}

transport := this.transportOf(NewDispatcher("amqps://localhost", Options.TLS(config)))

this.So(transport.tls == config, should.BeTrue)
}

func (this *ConfigFixture) TestDialerOption_SetsDialer() {
dialer := &net.Dialer{}

transport := this.transportOf(NewDispatcher("amqp://localhost", Options.Dialer(dialer)))

this.So(transport.dialer == dialer, should.BeTrue)
}

func (this *ConfigFixture) TestAddress_PromotesQueryCredentialsToUserinfo() {
transport := this.transportOf(NewDispatcher("amqp://host/?username=bob&password=secret"))

this.So(transport.address, should.Equal, "amqp://bob:secret@host/")
}
37 changes: 37 additions & 0 deletions dispatcher/rabbitmq/credentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package rabbitmq

import (
"cmp"
"net/url"
)

// promoteCredentials returns address with credentials promoted into the URL
// userinfo, where amqp091-go expects them. The smarty messaging/v3 connector
// also honors credentials supplied as ?username=&password= query parameters, but
// amqp091 reads only the userinfo and otherwise falls back to guest/guest —
// yielding a 403. Promoting them here lets a single message_host config work for
// both the consumer and the dispatcher. Userinfo takes precedence over query
// parameters, matching messaging/v3; an unparseable address is returned as-is.
func promoteCredentials(address string) string {
endpoint, err := url.Parse(address)
if err != nil || endpoint == nil {
return address
}
query := endpoint.Query()
username := query.Get("username")
password := query.Get("password")
if username == "" && password == "" {
return address
}
if endpoint.User != nil {
username = cmp.Or(endpoint.User.Username(), username)
if existing, ok := endpoint.User.Password(); ok {
password = cmp.Or(existing, password)
}
}
endpoint.User = url.UserPassword(username, password)
query.Del("username")
query.Del("password")
endpoint.RawQuery = query.Encode()
return endpoint.String()
}
Loading