Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
b970668
piped "./receive.sh | cargo run -- | jq"
olirice Apr 29, 2022
e49e457
single command
olirice Apr 29, 2022
15fd585
rm shell script
olirice Apr 29, 2022
0804b64
minimal readme
olirice Apr 29, 2022
761068b
add requirements to readme
olirice Apr 29, 2022
0aa5962
realtime transport
olirice May 17, 2022
e94ea54
improve stdio buffer; handle unwraps explicitly
olirice May 18, 2022
1c80b2c
handle non-json stdin
olirice May 18, 2022
8e1069e
enable logger
olirice May 18, 2022
001a37a
idempotent migrations
olirice May 18, 2022
2d63ec5
sync from master
olirice May 18, 2022
e836d93
handle postgres restarts/shutdowns gracefully
olirice May 18, 2022
5415a21
walrus uses logging
olirice May 18, 2022
cd0e06c
log walrus startup
olirice May 18, 2022
128c0cf
generic headers
olirice May 19, 2022
d0af269
generic header in readme
olirice May 19, 2022
36fea60
topic tweaks
olirice May 25, 2022
8f2cb85
remove topic from heartbeat
olirice May 25, 2022
9dd2403
handle protocol errors from realtime
olirice Jun 1, 2022
9a48ae9
structured wal2json parser
olirice Jun 1, 2022
a004790
towards cached rust impl
olirice Jun 1, 2022
c180a33
partial move. all except rls and filters
olirice Jun 1, 2022
317e83b
load subscriptions
olirice Jun 2, 2022
cb39f2a
manage subscriptions using the wal stream
olirice Jun 2, 2022
a8bbbf8
hooks for replacement of pg_recvlogical
olirice Jun 14, 2022
6531cab
readme
olirice Jun 14, 2022
d2620bf
Merge branch 'master' into worker
olirice Jun 17, 2022
6e26949
port regtype failover
olirice Jun 17, 2022
3cee068
Merge branch 'worker' into worker_rs_impl
olirice Jun 17, 2022
835cd51
rls delegate works. filter delegate sql untested
olirice Jun 20, 2022
f27c56b
reasonably complete local filtering
olirice Jun 21, 2022
921096c
bugfix invert filters
olirice Jun 21, 2022
f117706
parse subscriptions from wal stream
olirice Jun 21, 2022
f1ec1a4
subscription manager
olirice Jun 21, 2022
c13d483
batch filters delegated to sql
olirice Jun 22, 2022
6feb652
chrono for timestamps
olirice Jun 22, 2022
8691e89
reduce heartbeat freq
olirice Jun 22, 2022
ccfd3d3
rm wal2json output;
olirice Jun 22, 2022
8650160
sub name and realtime_fmt alias
olirice Jun 22, 2022
c520d89
bugfix timestamp format
olirice Jun 22, 2022
1cbb7f7
skip roundtrip when no sql filter delegates
olirice Jun 22, 2022
cf16611
diesel table defs
olirice Jun 24, 2022
6421682
rm deprecated sql functions
olirice Jun 24, 2022
1f85e23
publication is argument
olirice Jun 24, 2022
0a2b8d1
reduce workload on early exit
olirice Jun 24, 2022
d181e23
missing semicolon
olirice Jun 24, 2022
294e49f
subscription query bugfix
olirice Jun 24, 2022
9aa7048
filter subs, not ids
olirice Jun 27, 2022
0476873
extract topic
olirice Jun 28, 2022
720c532
show id on failed sub loc
olirice Jun 28, 2022
9e1b2c0
beginning of test port
olirice Jul 5, 2022
b9c9d6e
value in tests
olirice Jul 6, 2022
8312c54
references throughout structs
olirice Jul 6, 2022
3791c8c
reduce allocs
olirice Jul 6, 2022
8d6ccdd
relocate sql migrations and schema
olirice Jul 6, 2022
587d6d7
relocate models
olirice Jul 6, 2022
ec25871
towards filter relocation
olirice Jul 6, 2022
f693bb6
reloc pkey helper functions
olirice Jul 6, 2022
359a4a9
action casting
olirice Jul 6, 2022
ea26d3a
remove redundant filters
olirice Jul 6, 2022
6caaaf9
end-to-end test
olirice Jul 6, 2022
8d9bf4c
refactor nested loop to compute record and old_record
olirice Jul 7, 2022
7b93647
selectable columns refactor. simple_* tests
olirice Jul 7, 2022
d12e8ce
test unauthorized
olirice Jul 7, 2022
0926e1f
test quoted tables, schemas, types
olirice Jul 7, 2022
b3c1b09
subscription filter speedup
olirice Jul 8, 2022
d8b8f1c
sql functions to table_oid where practical
olirice Jul 8, 2022
9a5b2f3
explicit error enum
olirice Jul 8, 2022
fced27d
core rust-side filter logic tests
olirice Jul 8, 2022
d4651c1
rls on quoted test
olirice Jul 8, 2022
733cccc
filter in quoted user defined type test
olirice Jul 8, 2022
9e22f4f
reconnecting ws
olirice Jul 11, 2022
29dfadb
reconnecting ws
olirice Jul 11, 2022
c6e7c8a
reconnects function
olirice Jul 11, 2022
01ef7ab
e2e reconnect handle
olirice Jul 11, 2022
554dc8f
display messages in stream logical
olirice Jul 11, 2022
b993104
rm filter printlines
olirice Jul 11, 2022
5b15a1f
remove (incomplete) stream_logical from branch
olirice Jul 13, 2022
ec55a96
insert, update, delete, truncate integration test w/ filters and rls
olirice Jul 13, 2022
5ef01ec
Merge branch 'master' into worker
olirice Jul 13, 2022
fd40c90
walrus worker ci
olirice Jul 13, 2022
c6b996b
docker image typo
olirice Jul 13, 2022
4158b03
check out the code in ci...
olirice Jul 13, 2022
c65c7e3
test subscription manager
olirice Jul 13, 2022
fa7ea98
reduce heartbeat to every 30 seconds
olirice Jul 13, 2022
b3ba019
drop unused functions
olirice Jul 13, 2022
c71134c
document publication usage in readmes
olirice Jul 21, 2022
962db20
use modern module declarations
olirice Jul 21, 2022
5cf69be
document running walrus tests
olirice Jul 21, 2022
c1dc68a
Move remaining modules to mod.rs
sweatybridge Jul 22, 2022
d8ed480
Update README.md
olirice Aug 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/worker_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: WALRUS worker tests

on:
pull_request:
branches: [master]
push:
branches: [master]

jobs:
build:
name: worker tests
runs-on: ubuntu-latest
timeout-minutes: 15

services:
postgres:
image: supabase/postgres:latest
env:
POSTGRES_DB: postgres
POSTGRES_HOST: localhost
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- 5501:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3

- name: run tests
run: |
cd worker
cargo test --bin walrus -- --test-threads=1
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ regression.*
__pycache__/
*.egg-info/
*.swp
target/
Cargo.lock
.DS_Store
5 changes: 5 additions & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[workspace]
members = [
"walrus",
"realtime",
]
66 changes: 66 additions & 0 deletions worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# WALRUS Worker + Realtime Transport

Example showing how to stream WAL from postgres, apply row level security, and push changes to supabase realtime.

- `walrus/` is responsible for receiving WAL, formatting messages, and applying row level security
- `realtime/` is the websocket trasport layer for supabase realtime.

More info about each component can be found in their directories' README.md.

## Example:

Requires:
- rust/cargo
- docker-compose
- postgres installed locally (for `pg_recvlogical`)

Clone and Navigate
```sh
git clone https://github.com/supabase/walrus.git
cd walrus
git checkout worker
cd worker
```

Start the DB
```sh
docker-compose up
```

Run the `walrus` worker, piping its output to `realtime` transport
```sh
cargo run --bin walrus -- \
--connection=postgresql://postgres:password@localhost:5501/postgres --publication=walrus_pub |
cargo run --bin realtime -- \
--url=wss://sendwal.fly.dev/socket \
--header=apikey=<apikey>
```

Connect to the database at `postgresql://postgres:password@localhost:5501/postgres`

and execute the following SQL to create a subscription and a WAL record.

```sql
-- Create a table we can subscribe to
create table book(
id int primary key,
title text
);

create publication walrus_pub for all tables;

-- Create a dummy subscription to our new table
insert into realtime.subscription(subscription_id, entity, claims)
select
gen_random_uuid(),
'public.book',
jsonb_build_object(
'role', 'postgres',
'email', 'o@r.com',
'sub', gen_random_uuid()
);

-- Create a record
insert into book(id, title)
values (1, 'Foo');
```
23 changes: 23 additions & 0 deletions worker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
version: '3'
services:
db:
container_name: walrus_streaming
build:
context: .
dockerfile: ./dockerfiles/Dockerfile
ports:
- "5501:5432"
command:
- postgres
- -c
- wal_level=logical
- -c
- fsync=off
healthcheck:
test: ["CMD-SHELL", "PGUSER=postgres", "pg_isready"]
interval: 1s
timeout: 10s
retries: 5
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
7 changes: 7 additions & 0 deletions worker/dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM supabase/postgres:latest
RUN apt-get update
RUN apt-get install build-essential postgresql-server-dev-14 -y

RUN git clone https://github.com/eulerto/wal2json.git
RUN cd wal2json && git checkout 53b548a29ebd6119323b6eb2f6013d7c5fe807ec && make && make install

13 changes: 13 additions & 0 deletions worker/realtime/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.DS_Store
.idea/
/target
*.iml
**/*.rs.bk
Cargo.lock
*.swp
*.rs.swp
**/*.swp
*.diff
/results
regression.diffs
regression.out
22 changes: 22 additions & 0 deletions worker/realtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "realtime"
version = "0.1.0"
edition = "2021"

[dependencies]
clap = { version = "3.1.12", features = ["derive"] }
dotenv = "0.15.0"
serde_json = "1.0"
serde = { version = "1.0", features=["derive"] }
uuid = { version = "1.0", features = ["serde"] }
tokio = { version = "1", features = ["full"] }
tokio-util = { version="0.7.2", features=["codec"] }
tokio-tungstenite = { verison = "0.17.1", features=["native-tls"] }
tungstenite = "0.17.1"
futures-util = "0.3.21"
url = "2.2.2"
futures-channel = "0.3.21"
futures = "0.3.21"
log = "0.4.17"
env_logger = "0.9.0"
stream-reconnect = { version = "0.3", default-features = true }
23 changes: 23 additions & 0 deletions worker/realtime/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Realtime Transport

Realtime Transport reads JSON form stdin and forwards it to supabase realtime

See parent directory for example usage

## CLI

```
realtime 0.1.0
reads JSON from stdin and forwards it to supabase realtime

USAGE:
realtime [OPTIONS]

OPTIONS:
-h, --help Print help information
--header <HEADER>=<VALUE>
--topic <TOPIC> [default: room:test]
--url <URL> [default: wss://sendwal.fly.dev/socket]
-V, --version Print version information

```
Loading