Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ MODULES = wal2json
# message test will fail for <= 9.5
REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea message typmod \
filtertable selecttable
filtertable selecttable xmin nextxid

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Parameters
* `write-in-chunks`: write after every change instead of every changeset. Default is _false_.
* `include-lsn`: add _nextlsn_ to each changeset. Default is _false_.
* `include-unchanged-toast`: add TOAST value even if it was not modified. Since TOAST values are usually large, this option could save IO and bandwidth if it is disabled. Default is _true_.
* `include-xmins`: Add slot _catxmin_ (catalog xmin) and _xmin_ to each changeset. Default is _false_.
* `include-next-xids`: Adds _nextxid_ and _epoch_ (indicates xid wraparound) to each changeset. Default is _false_.
* `filter-tables`: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. `*.foo` means table foo in all schemas and `bar.*` means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table `"public"."Foo bar"` should be specified as `public.Foo\ bar`.
* `add-tables`: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from `filter-tables`.

Expand Down
31 changes: 31 additions & 0 deletions expected/nextxid.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
\set VERBOSITY terse
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
?column?
----------
init
(1 row)

DROP TABLE IF EXISTS nextxid ;
NOTICE: table "nextxid" does not exist, skipping
CREATE TABLE nextxid (id integer PRIMARY KEY);
INSERT INTO nextxid values (1);
-- convert 32 bit epoch and 32 bit xid into 64 bit from txid_current
SELECT ((max(((data::json) -> 'epoch')::text::int)::bit(32) << 32) | max(((data::json) -> 'nextxid')::text::int)::bit(32))::bigint = txid_current() FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-next-xids', '1');
?column?
----------
t
(1 row)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'nextxid') IS NOT NULL;
data
------
(0 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)

42 changes: 42 additions & 0 deletions expected/xmin.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
\set VERBOSITY terse
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
?column?
----------
init
(1 row)

DROP TABLE IF EXISTS xmin ;
NOTICE: table "xmin" does not exist, skipping
CREATE TABLE xmin (id integer PRIMARY KEY);
INSERT INTO xmin values (1);
-- xmin is often (always?) 0, but this should be forward compat should that change in the future
SELECT max(((data::json) -> 'xmin')::text::int) = (SELECT coalesce(xmin::text::int, 0) FROM pg_replication_slots WHERE slot_name = 'regression_slot') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xmins', '1');
?column?
----------
t
(1 row)

SELECT max(((data::json) -> 'catxmin')::text::int) = (SELECT catalog_xmin::text::int FROM pg_replication_slots WHERE slot_name = 'regression_slot') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xmins', '1');
?column?
----------
t
(1 row)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'catxmin') IS NOT NULL;
data
------
(0 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'xmin') IS NOT NULL;
data
------
(0 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)

17 changes: 17 additions & 0 deletions sql/nextxid.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
\set VERBOSITY terse

-- predictability
SET synchronous_commit = on;

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

DROP TABLE IF EXISTS nextxid ;

CREATE TABLE nextxid (id integer PRIMARY KEY);
INSERT INTO nextxid values (1);

-- convert 32 bit epoch and 32 bit xid into 64 bit from txid_current
SELECT ((max(((data::json) -> 'epoch')::text::int)::bit(32) << 32) | max(((data::json) -> 'nextxid')::text::int)::bit(32))::bigint = txid_current() FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-next-xids', '1');

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'nextxid') IS NOT NULL;
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
19 changes: 19 additions & 0 deletions sql/xmin.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
\set VERBOSITY terse

-- predictability
SET synchronous_commit = on;

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

DROP TABLE IF EXISTS xmin ;

CREATE TABLE xmin (id integer PRIMARY KEY);
INSERT INTO xmin values (1);

-- xmin is often (always?) 0, but this should be forward compat should that change in the future
SELECT max(((data::json) -> 'xmin')::text::int) = (SELECT coalesce(xmin::text::int, 0) FROM pg_replication_slots WHERE slot_name = 'regression_slot') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xmins', '1');
SELECT max(((data::json) -> 'catxmin')::text::int) = (SELECT catalog_xmin::text::int FROM pg_replication_slots WHERE slot_name = 'regression_slot') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xmins', '1');

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'catxmin') IS NOT NULL;
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL) where ((data::json) -> 'xmin') IS NOT NULL;
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
77 changes: 70 additions & 7 deletions wal2json.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* wal2json.c
* JSON output plugin for changeset extraction
* JSON output plugin for changeset extraction
*
* Copyright (c) 2013-2018, Euler Taveira de Oliveira
*
Expand All @@ -12,6 +12,8 @@
*/
#include "postgres.h"

#include "access/xlog.h"

#include "catalog/pg_type.h"

#include "replication/logical.h"
Expand Down Expand Up @@ -40,6 +42,8 @@ typedef struct
bool include_typmod; /* include typmod in types */
bool include_not_null; /* include not-null constraints */
bool include_unchanged_toast; /* include unchanged TOAST field values in output */
bool include_xmins; /* include catalog_xmin and xmin field in output */
bool include_next_xids; /* include postgres epoch field in output */

bool pretty_print; /* pretty-print JSON? */
bool write_in_chunks; /* write in chunks? */
Expand Down Expand Up @@ -124,7 +128,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE
#endif
);
);
data->include_xids = false;
data->include_timestamp = false;
data->include_schemas = true;
Expand All @@ -136,6 +140,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
data->include_lsn = false;
data->include_not_null = false;
data->include_unchanged_toast = true;
data->include_xmins = false;
data->include_next_xids = false;
data->filter_tables = NIL;

/* add all tables in all schemas by default */
Expand Down Expand Up @@ -300,6 +306,32 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-xmins") == 0)
{
if (elem->arg == NULL)
{
elog(LOG, "include-xmins is null");
data->include_xmins = true;
}
else if (!parse_bool(strVal(elem->arg), &data->include_xmins))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-next-xids") == 0)
{
if (elem->arg == NULL)
{
elog(LOG, "include-next-xids is null");
data->include_next_xids = true;
}
else if (!parse_bool(strVal(elem->arg), &data->include_next_xids))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "filter-tables") == 0)
{
char *rawstr;
Expand Down Expand Up @@ -416,6 +448,37 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
appendStringInfo(ctx->out, "\"timestamp\":\"%s\",", timestamptz_to_str(txn->commit_time));
}

if (data->include_xmins)
{
if (data->pretty_print)
{
appendStringInfo(ctx->out, "\t\"xmin\": %u,\n", ctx->slot->effective_xmin);
appendStringInfo(ctx->out, "\t\"catxmin\": %u,\n", ctx->slot->effective_catalog_xmin);
}
else
{
appendStringInfo(ctx->out, "\"xmin\":%u,", ctx->slot->effective_xmin);
appendStringInfo(ctx->out, "\"catxmin\":%u,", ctx->slot->effective_catalog_xmin);
}
}

if (data->include_next_xids)
{
TransactionId nextXid;
uint32 epoch;
GetNextXidAndEpoch(&nextXid, &epoch);
if (data->pretty_print)
{
appendStringInfo(ctx->out, "\t\"nextxid\": %u,\n", nextXid);
appendStringInfo(ctx->out, "\t\"epoch\": %u,\n", epoch);
}
else
{
appendStringInfo(ctx->out, "\"nextxid\":%u,", nextXid);
appendStringInfo(ctx->out, "\"epoch\":%u,", epoch);
}
}

if (data->pretty_print)
appendStringInfoString(ctx->out, "\t\"change\": [");
else
Expand Down Expand Up @@ -1320,9 +1383,9 @@ parse_table_identifier(List *qualified_tables, char separator, List **select_tab
static bool
string_to_SelectTable(char *rawstring, char separator, List **select_tables)
{
char *nextp;
char *nextp;
bool done = false;
List *qualified_tables = NIL;
List *qualified_tables = NIL;

nextp = rawstring;

Expand All @@ -1335,9 +1398,9 @@ string_to_SelectTable(char *rawstring, char separator, List **select_tables)
/* At the top of the loop, we are at start of a new identifier. */
do
{
char *curname;
char *endp;
char *qname;
char *curname;
char *endp;
char *qname;

curname = nextp;
while (*nextp && *nextp != separator && !isspace(*nextp))
Expand Down