Hacker News new | ask | show | jobs
by andrewmutz 1353 days ago
Kafka Connect can do all this for you if you configure it properly. You would use a postgres "source" connector called Debezium that tracks all changes via postgres replication. All row changes then flow in realtime to Kafka topics. Keeping the data updated in real time in elastic search is also another off-the-shelf Kafka Connector (a "sink" connector)
3 comments

What you are describing is having the data in elasticsearch in the same format as the data in postgresql. Which is easy.

You could also for instance create a script and use a postgresql logical replication connection ( just like Debezium ) stream the changes into elasticsearch. Without having a full Kafka connect setup. And all the training an maintenance that comes with it.

What I am describing is, before storing the data in Elasticsearch computing the data in a materialized format. So it's more efficient for Elasticsearch to work with. And no longer having any needs for joins.

So instead of recomputing the materialized data every time in it's entirety you want to be more smart about it. The root table of the materialized data depends on maybe 5 other tables. So if data in those 5 other tables change you need to know if they have a relationship with a row in the "root table". And then only re-materialize those rows.

Materialize does this by having it's own SQL language where you define your materialized view. Which compiles to clever algorithms and uses an execution engine to get to this result.

What I am doing is just having a lookup graph + queries to see what tables, id entries are invalidated. And I re-materialize using normal sql and some extra processing using a nodejs script to make it more optimal for Elasticsearch.

It's not as fancy. But it works and does the job.

The biggest problem we've encountered with existing tools in the Kafka ecosystem (and the homegrown solutions that we've seen) is that nearly all of them sacrifice consistency. Debezium and most other Kafka Connect plugins will produce duplicate records upon restart, for example, that are very difficult to correctly deduplicate downstream. Things look right when you first turn on the plugin, but a week later when your Kafka Connect cluster restarts, a bit of incorrectness seeps in.

Materialize, by contrast, has been explicitly designed to preserve the consistency present in your upstream system. Our PostgreSQL source, for example, ensures that transactions committed to PostgreSQL appear atomically in Materialize, even when those transactions span multiple tables. See our "consistency guarantees" docs for some more information [0]. We have some additional features coming down the pipe, too, like allowing you to guarantee that your queries against Materialize reflect the latest data in your upstream sources [1].

[0]: https://materialize.com/docs/unstable/overview/isolation-lev...

[1]: https://github.com/MaterializeInc/materialize/issues/11531

You can achieve consistency using a transactional outbox and "homegrown" solutions the following way.

Make sure postgresql is configured with `synchronous_commit = remote_apply`

* Create a postgresql logical replication slot which creates a postgresql snapshot in time.

* Start a repeatable read transaction with the snapshot id

* Store all relevant data from the snapshot in sqlite / kv store

* Start listening for WAL changes ( json or protobufs )

* Receive WAL change, mark to postgresql the "write" position of the slot

* Process the data and query all relevant data for materialization from sqlite/kv

* Send data to elasticsearch

* Mark to postgresql the "flush" and "apply" position of the slot

This way you achieve consistency using "homegrown" or Kafka connect possibly too.

Failures while communicating to the external systems (the kv store and elastic in your example) are usually where this falls down. It's easy to build a system that's consistent ~90% of the time, but if you want to build a system where things like failures during snapshot write or failures during export to elastic are handled properly it starts getting complex (you will need to find ways to recover and retract data, or build smarts into the consumer to query around aborts, or find a way to do a 2PC-esque dance with the external system a la Kafka's transaction support, etc.). Getting to full consistency isn't easy.
This has been my experience too. Instead of going the logical replication route I tend to leverage the transactional outbox to achieve consistency in the application layer instead.

So when I transact data into tables I immediately fetch the latest outbox id.

And then when query from Elasticsearch I first fetch what the last outbox id of the processed data is.

This way I know if the transaction was already processed into Elasticsearch or not. Repeat. Until outbox id of Elasticsearch is equal or higher than the outbox id of the mutation.

This way I don't have to use logical replication, no k/v store and I can just use a script that fetches and processes the latest outbox changes on a loop.

Looked in the source of Materialize and it looks like this is exactly what they are doing.

They are using Debezium + Kafka for receiving the WAL changes. And using send the processed WAL offsets back using a Kafka topic to Debezium + Postges. This way they can achieve consistency

It's very hard for Kafka Connect plugins to maintain consistency in all scenarios - both because of the semantics of some upstream databases, and because of the guarantees the connect API itself offers. Hopefully KIP-618 will eliminate more of the edge cases though.