Public | Automated Build

Last pushed: 3 months ago
Short Description
A Postgres server and logical replication plugin for the "transicator" software.
Full Description

Transicator

Transicator is a transaction replicator.

It builds on the "logical decoding" capability of Postgres to allow a large
number of occassionally-connected clients to consistently consume
changes made to a Postgres database.

For example, at Apigee, we are using Transicator to push configuration data
to a large number of usually-but-not-always-connected infrastructure servers.
Transicator allows us to "program" this network of servers using standard
RDBMS technology on the "top," and then have the changes be consumed by a
large number of clients using an HTTP-based API.

In order to consume changes, all a client needs is to be able to make an HTTPs
request to the Transicator servers. This makes it possible to create clients
for Transicator that run in a variety of constrained environments.

Status

This is not an official Google product. It is an open-source project and
we are happy to receive feedback, fixes, and improvements. Please see
CONTRIBUTING for more.

Overview

There are three components to Transicator:

Postgres Logical Replication

The "logical replication" feature of Postgres (version 9.4 and higher) does the
heavy lifting of ordering changes from every committed transaction into
commit order and publishing them to a logical replication slot.

Transicator adds a plugin to Postgres that formats these changes in JSON so
that we can easily consume them.

Change Server

The goal of Transicator is to distribute logical changes to thousands of
occassionally-connected clients. Postgres is not designed to handle replication
consumers at this scale, and it does it using a complex protocol.

The "changeserver" is a server that consumes the stream of logical changes
directly from Postgres using the streaming replication protocol, and makes them
available via an HTTP API. Changes in changeserver are ordered by the order
in which they appeared in the logical replication stream, and are further
segregated by "selector" (formerly called "scope").

changeserver is designed to handle thousands of clients waiting for the next
change via "long polling." This way, we can scale up the number of consumers
for the set of changes by adding changeservers.

Snapshot Server

The changeserver would be impractical if it had to store every change for
every selector since the beginning of time. So, the snapshot server lets
clients get an initial snapshot of the data by requesting the contents of
the database for a particular set of selectors at a particular point in time.

The snapshot server and change server operate on the principle of a "selector"
(formerly called "scope") A selector is simply a unique identifier for a
particular set of records in the database.

In order for transicator to work, we need to identify records by selector. We do
this by adding a "text" column named "_change_selector" to every table that we
wish to replicate using transicator.

The valid characters for a selector are: 0-9a-z_-

API Specifications:

Quick Start with Docker

First, start a Postgres server that has the Transicator plugin installed:

docker run -p 5432:5432 --name pg -d \
-e POSTGRES_PASSWORD=changeme apigeelabs/transicator-postgres

Now you have a Postgres server running on port 5432 of whatever machine you
have Docker on. For instance, assuming that Docker runs on localhost, verify
that you can reach it. (BTW, you set the password above to be "changeme".")

psql -W -h localhost postgres postgres
Type "help" for help.

postgres=# select * from now();
              now
-------------------------------
 2016-11-01 22:20:11.092593+00
(1 row)

Looks great -- control-D to quit, BTW.

Now, start a snapshot server running on port 9001:

docker run -d -p 9001:9001 apigeelabs/transicator-snapshot \
-u postgres://postgres:changeme@localhost/postgres -p 9001

You can test quickly -- this API call should return "303 See Other":

curl -v http://localhost:9001/snapshots?selector=foo

Finally, start a change server running on port 9000:

docker run -d -p 9000:9000 apigeelabs/transicator-changeserver \
-p 9000 -u postgres://postgres:changeme@localhost/postgres -s testslot

This API call will tell you that the database is empty:

curl http://localhost:9000/changes

Complete API Example

Now that the servers are running, you can test the API.

To get started, imagine that we have a database that contains some tables
that have an _change_selector column, and that there are some rows in those tables
that have the value "foo" for _change_selector. For instance, let's create a table
and insert some values:

psql -W -h localhost postgres postgres
postgres=# create table test
    (id varchar primary key,
    val integer,
    _change_selector varchar);
CREATE TABLE
postgres=# alter table test replica identity full;
ALTER TABLE
postgres=# insert into test values ('one', 1, 'foo');
INSERT 0 1
postgres=# insert into test values('two', 2, 'bar');
INSERT 0 1
postgres=# insert into test values('three', '3', 'foo');
INSERT 0 1

Getting a snapshot

We can now get a snapshot in JSON format using the snapshot server:

$ curl -H "Accept: application/json" -L http://localhost:9001/snapshots?selector=foo

{"snapshotInfo":"229444:229444:","timestamp":"2016-10-13T17:42:12.171306-07:00",
"tables":[{"name":"public.snapshot_test","rows":[]},{"name":"scope","rows":[
{"_change_selector":{"value":"foo","type":1043},"val":{"value":"bar","type":1043}}]},
{"name":"public.developer","rows":[]},{"name":"app","rows":[]},
{"name":"public.test","rows":[{"_change_selector":{"value":"foo","type":1043},
"id":{"value":"one","type":1043},"val":{"value":"1","type":23}},
{"_change_selector":{"value":"foo","type":1043},"id":{"value":"three","type":1043},
"val":{"value":"3","type":23}}]}]}

(In the example above, we use the "Accept" header because the snapshot server
can return different formats. We also use the "-L" argument to curl because the
API above actually redirects to a different URI.

The returned snapshot contains data that is consistent at a certain point
in time from a Postgres perspective. We can now use the change server to see
what has changed since that time

Getting the first changes

The first time the changeserver is used, we do not know where to begin.
However, the snapshot contains the "snapshotInfo" field, which tells us which
Postgres transactions were visible when the snapshot was created.

At this point, using the changeserver, a client may issue the following
API call:

curl -H "Accept: application/json" "http://localhost:9000/changes?snapshot=229444:229444:&selector=foo&block=10"

This call asks for all the changes for the selector "foo" starting from the first
change that was not visible in the sequence. If no changes appear for 10 seconds,
it will return with something like this:

{
  "lastSequence": "0.1390d838.0",
  "firstSequence": "",
  "changes": null
}

Getting the next changes

Once the first set of changes has been retrieved, the "lastSequence" parameter
tells us where in the change stream we left off. We can now use this
in another API call to wait for more changes:

curl -H "Accept: application/json" "http://localhost:9000/changes?since=0.1390d838.0&selector=foo&block=10"

This call won't result in any changes either but it will return us another sequence
if anything changed in the database.

Let's try again (with a longer timeout this time):

curl -H "Accept: application/json" "http://localhost:9000/changes?since=0.1390d838.0&selector=foo&block=60"

and while we're waiting, let's use another window to insert something to the database:

postgres=# insert into test values('four', 4, 'foo');
INSERT 0 1

as soon as the change was committed to the database, a change
should come back from the API call.

{
  "lastSequence": "0.13923950.0",
  "firstSequence": "",
  "changes": [
    {
      "operation": 1,
      "table": "public.test",
      "sequence": "0.13923950.0",
      "commitSequence": 328350032,
      "changeSequence": 328349576,
      "commitIndex": 0,
      "txid": 229444,
      "newRow": {
        "_change_selector": {
          "value": "foo",
          "type": 1043
        },
        "id": {
          "value": "four",
          "type": 1043
        },
        "val": {
          "value": "4",
          "type": 23
        }
      }
    }
  ]
}

A successful user of the change server should now use the "lastSequence"
from this call and keep on making changes. For instance, experiment
with "update" and "delete" and see what happens. Experiment with transactions.
Committed transactions will only appear in the change log all at once
when the transactions commit. Rolled back transactions will never appear.

For instance:

curl -H "Accept: application/json" "http://localhost:9000/changes?selector=foo&block=60&since=0.1392ed18.0"

and then...

postgres=# begin;
BEGIN
postgres=# update test set val = 999 where id= 'one';
UPDATE 1
postgres=# delete from test where id = 'three';
DELETE 1
postgres=# commit;
COMMIT

Should result in:

{
  "lastSequence": "0.1392efb8.0",
  "firstSequence": "",
  "changes": [
    {
      "operation": 2,
      "table": "public.test",
      "sequence": "0.1392efb8.0",
      "commitSequence": 328396728,
      "changeSequence": 328396496,
      "commitIndex": 0,
      "txid": 229449,
      "newRow": {
        "_change_selector": {
          "value": "foo",
          "type": 1043
        },
        "id": {
          "value": "one",
          "type": 1043
        },
        "val": {
          "value": "999",
          "type": 23
        }
      },
      "oldRow": {
        "_change_selector": {
          "value": "foo",
          "type": 1043
        },
        "id": {
          "value": "one",
          "type": 1043
        },
        "val": {
          "value": "1",
          "type": 23
        }
      }
    }
  ]
}

and then another call should immediately give us:

{
  "lastSequence": "0.1392efb8.1",
  "firstSequence": "",
  "changes": [
    {
      "operation": 3,
      "table": "public.test",
      "sequence": "0.1392efb8.1",
      "commitSequence": 328396728,
      "changeSequence": 328396648,
      "commitIndex": 1,
      "txid": 229449,
      "oldRow": {
        "_change_selector": {
          "value": "foo",
          "type": 1043
        },
        "id": {
          "value": "three",
          "type": 1043
        },
        "val": {
          "value": "3",
          "type": 23
        }
      }
    }
  ]
}

(What happened here? The change was propagated to us as soon as it appeared
in the database. The second change was available, though, and came through
right away.)

Replication Settings

If, in the example above, no information was delivered on the "delete"
or "update" operations, it may be the replication settings for Postgres.
By default, Postgres only delivers the primary key on a deleted or updated
row, and that means that the change server does not see the "_change_selector"
column.

The way to fix this is to change the table settings in Postgres so that
each row is delivered to the logical replication plugin. The command
looks like this:

alter table NAME replica identity full

Client best practices

So to sum it up, clients of transicator should do the following:

1) Download a snapshot for the selectors in which they are interested

2) Store the snapshot somewhere locally

3) Use the "snapshotInfo" field of the snapshot to request changes since the
beginning of time that were not visible at the time of the snapshot.

4) Use the "lastSequence" field on that API response, and all others,
and NOT the "snapshot" field, to get changes since the last set of
changes that the client saw.

5) Use the "block" parameter so that changes will immediately be delivered
to the client, and to avoid a huge number of API calls.

Alternate Encodings

The JSON encoding of changes and snapshots is fine, but it has a limitation
in that all data values are converted into strings. This is especially a
problem if the database contains binary data ("bytea" fields in Postgres.)

As an alternate encoding, the snapshot server and change server can
return data encoded in protocol buffers if the Accept header is
set as follows:

Accept: application/transicator+protobuf

The resulting protocol buffers can be decoded using the "common" package
in this project. To decode a change list, use:

common.UnmarshalChangeListProto(buf []byte)

and to decode a snapshot, use:

common.UnmarshalSnapshotProto(r io.Reader)

Finally, however, this leaves one last problem -- the snapshot may be very
large, and the "UnmarshalSnapshotProto" has to bring it all in memory at
once in order to create the "Snapshot" object.

For this problem, create a SnapshotReader using:

common.CreateSnapshotReader(r io.Reader)

A SnapshotReader allows you to read the snapshot one table and row at a time.
The best practice is to loop through the results of the reader and insert
data into the database as you read it from the snapshot reader. That way
snapshots may be of any size and there will be no memory issues.

(Furthermore, snapshots produced in protobuf format are also produced in a
streaming way on the snapshot server, whereas JSON snapshots are not.
So, there are advantages to using protobuf-format snapshots whenever
possible.)

Command-line Options

Snapshot server

  • -p (required): The port to listen on using HTTP.
  • -mp (optional): The port to listen on for health checks. If not set, will use
    the standard port.
  • -u (required): The Postgres URL. See below for URL formats.
  • -D (optional): Turn on debug logging.

For example, a standard snapshot server startup might look like this:

snapshotserver -p 9000 -u postgres://user:pass@host/database

Change server

  • -p (required): The port to listen on using HTTP.
  • -mp (optional): The port to listen on for health checks. If not set, will use
    the standard port.
  • -u (required): The Postgres URL. See below for URL formats.
  • -s (required): The name of the Postgres logical decoding slot. If multiple change
    servers connect to the same database, then it is important that this name
    be unique.
  • -m (optional): The lifetime for records in the database before they
    are automatically purged to save space. This parameter is in the same format
    as the Go language "time.ParseDuration" method, so values like "24h" and
    "60m" are valid.
  • -D (optional): Turn on debug logging.

For example, a standard change server startup might look like this:

changeserver -p 9001 -u postgres://user:pass@host/database \
-s slot1 -m 72h

Postgres Usage Notes

URLs

Both of the servers in this project use a "URL" to connect to Postgres.
These URLs are designed to be similar to those used in Postgres JDBC drivers.
They look like the following:

postgres://USER:PASS@HOST:PORT/DATABASE?ssl=SSL

The values are as follows:

  • USER: The postgres user name.
  • PASS: The password for that user. It is likely that this is required but
    it may not be.
  • HOST: The hostname or IP address of the database server.
  • PORT: The port of the database. Defaults to 5432.
  • DATABASE: The name of the database to connect to. Note that logical
    replication slots are per-database, so only changes to this database will
    be replicated.
  • SSL: If the database is configured to support SSL, then adding "ssl=true"
    will cause the server to attempt an SSL connection.

Developer Setup

You can build and test transicator on OS X or Linux. You can also build
and test Docker containers.

Since the changeserver uses RocksDB as a native C library, Go's cross-compilation
does not work for this project. To build for Linux, you have to build on Linux
(or inside a Docker container).

Build and Install on OS X

Install postgres

brew info postgresql

Configure postgres


Build and install PG logical replication output plugin

    cd apigee-labs/transicator/pgoutput/
    make install

Build changeserver and snapshotserver (non Docker):

    cd apigee-labs/transicator/
    make

Test on OS X

make tests

This will run the tests in all the modules.
The environment variable TEST_PG_URL must be set in order to point the tests
to the right Postgres setup.

Build and Test on Linux

Instructions are the same. However, you will need a recent build of RocksDB.
Some Linux distributions have RocksDB. If not, then you will need to to build.
"Dockerfile.changeserver" has one set of instructions that could be used to
install a local copy of RocksDB.

Test on Docker

make dockerTests

This will build Docker containers for Postgres, changeserver, and snapshotserver,
launch them, and then launch another container to run the tests.
This will work on any host that supports Docker -- there is no need to
install anything else in order to run these tests.

Build Docker Containers for Changeserver, snapshotserver and postgres

    cd apigee-labs/transicator/
    make docker

Optionally, you can also build docker inside the respective modules.
For example:

    cd apigee-labs/transicator/changeserver
    make docker

This will build the docker containers in their respective directories,
which then can be run locally.

Run them as docker containers:

The value of POSTGRES_URL is described above.

The SLOT_NAME is a unique name for the replication slot used by the
changeserver. Each changeserver must have a unique slot name -- otherwise
there will be replication errors and data loss. In addition, if a
changeserver is removed, its slot must be removed from the database,
or the database will grow its transaction log forever and not clean it
up, and eventually fail.

    cd apigee-labs/transicator/changeserver
    docker run --rm -it changeserver -u POSTGRES_URL -s SLOT_NAME
    cd apigee-labs/transicator/snapshotserver
    docker run --rm -it snapshotserver -u POSTGRES_URL

To see all replication slot:

select * from pg_replication_slots;

To delete a replication slot:

select * from pg_drop_replication_slot('SLOT_NAME');

To delete all inactive replication slots:

select pg_drop_replication_slot(slot_name) from pg_replication_slots where active is false;

Storage Engines

The change server maintains a local database of changes so that many clients
can poll for changes without putting a load on the database server.
The default storage implementation is built on top of SQLite -- we find that
it gives us good performance for our use case.

There is also an implementation of the changes server that uses RocksDB.
This implementation has different performance characteristics and these may
be better for some use cases. The "rocksdb" build tag is used to
build and/or test this version.

The build rule:

make rocksdb

Adds the build tag and produces a binary called "changeserver-rocksdb" that uses
this storage engine.

Docker Pull Command
Owner
apigeelabs
Source Repository