Hacker News new | ask | show | jobs
by jeffail 2525 days ago
"What’s more, we want to ensure the information about which events we’ve seen is written durably so we can recover from a crash, and that we never produce duplicate messages in our output."

Your processor is described as writing from Kafka to Kafka and using a persisted RocksDB instance to check message identifiers. How then do you ensure messages aren't dropped if your processor crashes or gets killed after checking against RocksDB but before the message is flushed to the Kafka broker?

Also is your producer writing to Kafka not at-least-once? If so then even if it removes all duplicates in its processing stage the feed written to your output topic could still contain duplicates.

By contrast deduplicating on consumption avoids that problem entirely by attempting to build an idempotent consumer, which results in an exactly-once. Although in this case they have identified edge cases of duplicates they're comfortable with.

1 comments

The processor will read the last message from the output topic on startup. That catches the use case where the processor crashes in between writing to kafka and recording the write in rocksDB.
Yeah but the first time it was read the key was stored in RocksDB, so the second time it gets consumed after the crash:

"If the message already exists in RocksDB, the worker simply will not publish it to the output topic and update the offset of the input partition, acknowledging that it has processed the message."

It gets dropped as if it were a duplicate, oops!

Edit: I reread the relevant section...

"If a message was found in the output topic, but not RocksDB (or vice-versa) the dedupe worker will make the necessary repairs to keep the database and RocksDB in-sync. In essence, we’re using the output topic as both our write-ahead-log, and our end source of truth, with RocksDB checkpointing and verifying it."

Not sure entirely what they mean by "(or vice-versa)", if the message exists in RocksDB but not in the output topic how you distinguish between a real duplicate and a crash artifact?

If the last message of a crash happens to be a real duplicate and this recovery mechanism reintroduces it into the pipeline then you have a duplicate.

Either way, it's not an exactly-once feed. At best (assuming message loss isn't possible) it's an at-least-once feed that usually appears to be exactly-once.

The order-of-operations is that it checks RocksDB first, writes to Kafka, and then writes to RocksDB.

If the write to Kafka fails, it re-positions itself in the input topic stream based on the offset annotation in the output topic's last message. The write never went to RocksDB, so it won't be considered a duplicate.

Recovering from a failed RocksDB write is more complicated. The output topic's last message will have an offset that will effectively be beyond the accumulated state in RocksDB. Transactionally the last input topic offset for each committed message is written to RocksDB alongside it. The recovery process uses this offset as a starting point when consuming the input topic. During this process, messages aren't published into the output topic until the offset read from the output topic is reached.

That makes more sense, thanks for clarifying.

Still, assuming there are no other edge cases there, it doesn't address the other problem where a hypothetical consumer of the output topic is reading an at-least-once feed of your exactly-once topic. In order for that not to be the case then the consumer must also be idempotent, in which case what value was gained from the original deduplication?

Not sure if this is what segment is doing, but if it’s a worker based on the Kafka Streams library, the k/v state store can be backed by a changelog topic, and if you have a transaction between that topic and the topic that commits offsets, the scenario you described won’t occur. The worker will reprocess the event again, and it rocks won’t have it because it’ll only have the state that was persisted before the crash.