Why would you choose a LSM Tree based storage mechanism for a message queue?
The only reason I can come up with would be because it's a read-to-use library you can just plug in which gives OK performance and some handy features because you can use the KV store for other things. But it doesn't scale well and backups with LevelDB are not really easy either (close DB, copy all files).
Message queues when they are ordered (at least on the local node/queue level) usually just need some kind of append-only log file. You don't do random reads or writes into the middle of the queue, you only modify the head and tail.
InfluxDB, albeit being a time series db has similar write patterns to a message queue, learned it the hard way when they first tried to use a LSM Tree database (LevelDB), then switched to a B+Tree (BoltDB/LMDB) but that also doesn't scale once the DB gets big and the tree has quite some depth.
They kindly did a nice writeup of their journey: https://influxdb.com/docs/v0.9/concepts/storage_engine.html
Why not do it simple and use append-only files without complex structure and management?
Check out Kafka for a better storage format for message queues of this kind.
PS: every message queue should first clearly explain what guarantees it provides.
A LSM tree is actually a good idea if you think about it.
The R/W patterns for a message queue are simple:
- Messages are key/value
- key is an autoincrementing id
- Writes are at the end, Reads are from the beginning
- Once a message is processed, it's deleted
So in practice this means that the items are written in an append-only fashion, get merged in bigger chunks, and then get progressively deleted. So at higher levels you don't see the huge latencies due to compaction because all records are deleted. Knowing that keys are only incrementing could also lead to a simple optimization: the compaction phase can be a simple concatenation of files.
So you get an append-only system that progressively removes older entries as they are deleted without resorting to mad science hackery [1]. Why didn't it work for InfluxDB ? All I can guess is that individual entries for each series are all mixed together (InfluxDB wants to be able to manage many series with many tags) and older entries are not deleted as frantically, so you get the latencies we all know with compaction and unpredictable reads.
Now, this is purely theoretical and of course further experimentations are needed to make sure this is correct, but LSM is in my opinion a correct pattern here.
For batched sequential writes, there is no other DB anywhere near as fast as LMDB
http://symas.com/mdb/microbench/
(Section E, Batched Writes)
But even so - the reason LMDB can do this so quickly is because for batched sequential writes it cheats - it's just performing Appends, there's no complicated tree construction/balancing/splitting of any kind going on.
If you know that your workload will only be producer/consumer, with sequentially generated data that is sequentially consumed, it's a stupid waste of time to mess with any other structure than a pure linear queue. (Or a circular queue, when you know the upper bounds of how much data is outstanding.)
As for your initial statement - no, an LSM tree is not a correct pattern here. If your consumers are actually running as fast (or faster) than your producer then it should never flush from Level0/memory to Level1/disk. In that case all you've got is an in-memory queue that evaporates on a system crash.
If your consumers are running slower, that means data is accumulating in the DB, which means you will have compaction delays. And the compaction delays will only get slower over time, as more and more levels need to be merged. (Remember that merge operations are O(N). Then remember that there are N of them to do. O(N^2) is a horrible algorithmic complexity.) LSM is never a correct pattern.
May I also mention that N log N is the total cost of compaction for a DB of size N. You don't perform a compaction on every single write. Amortised per write the cost is more like N log(N)/N == log(N).
Also, N log(N) is nowhere near exponential. O(2^N) would be exponential, and that's not what you have here.
Yes, goleveldb was chosen because it's a ready to use library with a decent write and read performance, and no external non-Go dependencies. It can also be used to store multiple consumers offsets in future.
Regarding provided guarantees, with simple 'get work_queue' reads it provides at-most-once delivery. With two phase reliable reads 'get work_queue/open', 'get work_queue/close' it provides at-least-once delivery (although message is kept in memory on server during a reliable read and will be lost if you SIGKILL siberite. On SIGTERM and SIGINT siberite will gracefully abort the read and save the message).
Indeed, either Siberite is a queue system which purpose is to dispatch each message to one and only one consumer for further processing and which requires the consumers to acknowledge fully processed messages ;
or Siberite is a journal system (in the spirit of Kafka) which purpose is to replay the full log to any consumer asking for it and which offers the consumer a watermark mechanism to keep track of their progress.
In the former case, the queue system is responsible of what to do in case of a missing or late acknowledgement (choosing between "at least once" or "at most once" message delivering).
In the later case, the consumers are responsible of how to maintain an atomic view of message consumption and message processing (for instance using a transaction to persist an offset with a state).
Right now it doesn't store any consumer offsets. And you can get either at-most-once or at-least-once guarantees.
But I found the idea of multiple consumer groups per queue very interesting. So basically you would still be able to fetch queue messages as you can do now and it will delete dequeued items, but you would also be able to use something like
'get queue_name:consumer_name'
and it will create a consumer group internally with a stored offset and will serve messages using that offset. In case of reliable read failure each consumer group will keep it's own queue of failed deliveries, will check that queue and serve these failed items first. If source queue head has changed and became larger then consumer group offset, then consumer group offset would just start from the source queue head.
This way you can get Kafka-like multiple consumer groups per queue as an additional feature.
Why is it the queues responsibility to store consumer offsets? Consumer is the only side that knows how far along his processing is. Why is the queue storing this data, when all the consumer has to do is tell it: send me events for topic X from point P forward.
If you wanted the consumers to be stateless, assuming they otherwise had a deterministic identity, then you could have the queue operate like a journal internally, but present a unified queue API to consumers.
So the queue keeps track of the high-watermark on a per consumer basis and all the consumer has to do is show up, tell the queue its deterministic name/id (might be driven by imaging, configuration, or SDN), and the queue will serve up the next new item that consumer hasn't seen yet.
This would be handy for really dynamic transient worker topologies because it keeps the mutable state and state tracking concerns entirely outside the transient worker.
That said, I still wouldn't use LevelDB. Unless I was expecting to do multi-attribute range queries or something (now we're well outside queue territory), but even then you're still folding over the data for knowable start/end markers and a linear scan over a binary term file will be faster than the multiple seeks + segment scans that LevelDB requires.
If the consumer is stateless then it needs to acknowledge every received event for it to be reliable. Otherwise the producer may think something is sent when it actually never arrived (tcp connection was closed).
So it's either unreliable or slow.
Also if you have dynamic transient worker topologies, you have to remember those positions. You are saving data for later use, that may never arrive. How long do you keep this data?
TCP would guarantee delivery, but you're right in that you wouldn't know if the consumer actually did anything with the message. It could have crashed on parsing or something.
But moving the concern to the consumer to track the cursor doesn't make the protocol any more stable. To keep a stable cursor, the consumer would need to persist that someplace, which just pushes the acknowledgement to that persistence component instead. If a stable cursor is what you're after, then co-locating it with the durable queue provides a simpler solution with a slightly better consistency guarantee.
The garbage collection problem is a real one, but realistically how many consumers is an infrastructure service like this going to have? Tens? Hundreds? Thousands? Millions? Billions?
No matter which one of those you pick it's a trivially small secondary index to maintain even if you never reaped it. I mean it's a K/V problem (consumer_id -> queue_offest) and there's a K/V store already sitting there. If you didn't want it to grow forever then you could establish a TTL policy via configuration.
The problem you would have is consumers that don't have stable or bounded id's. Like a system that assigns a new id every time the consumer makes a request or the consumer is restarted.
In case of reliable fetch failure each consumer group will keep it's own queue of failed deliveries (persisted on disk), will check that queue and serve these failed items first.
This couldn't have come at a better time - I was actually looking for a durable message-queue written in Go. Is there any way to read more about the architecture of this system? I find systems like these to be quite fascinating but taking the time to go through the code can sometimes be very time-consuming. It would be awesome if more projects have a writeup as detailed as cockroachdb[0]!
Aside: There used to be a site sometime back which used to distribute compiled binaries of Go code for all platforms? Is it still up any chance?
It's really simple. Each queue is a separate leveldb database on disk. Messages are stored as key/value using incremental ids. Head and tail of the queue are kept in memory and get initialized on startup via db scan.
Also, you have to be paying one hell of a compaction penalty if this isn't a grow-only dataset. By ordering your keys you're at least minimizing the overhead of compaction on write by utilizing the happy-path for how LevelDB moves data out of the write buffer and into the SSTs.
But deletes are going to have a big impact still, and (working from my failing memory of LevelDB internals) I think might actually be the pathologically sad case.
Fast start times are a valuable thing for a service component.
Stick about 10GB of small entries in it (should be enough to create all the levels) and then see what happens.
Also, you could reserve the persisted [H|T] for controlled shutdown scenarios. Basically anything that isn't complete system failure if you're properly trapping signals.
You can have large queue sizes (larger than RAM size) and siberite would still consume small amount of resident memory. You basically don't need a separate server with decent amount of memory for it. You can also can get benefit from two-phase reliable fetch - if your client gets disconnected without confirming a message, the message will be served to another client (very convenient if you use amazon spot instances for your workers).
Note that this also means that messages can be delivered more than once and/or that the clients need to remember the messages that they processed. In some setups that can be a showstopper.
Reliable fetch is a feature, not a protocol requirement.
You can use simple 'get work_queue' command to just get a message, or you can use 'get work_queue/open', 'get work_queue/close' - two phase fetch if you need a reliable fetch. You can also use 'get work_queue/close/open' command to acknowledge previous message and read a new one.
Ok so you can switch between at-most-once and at-least-once guarantees. While nice to have both options in a message queue, my point still stands.
Each of these have trade-offs and the way it is architectured here, in the at-least-once case you will have to either remember all the processed messages or be prepared to process a message multiple times, whatever that means in your specific use-case.
The only reason I can come up with would be because it's a read-to-use library you can just plug in which gives OK performance and some handy features because you can use the KV store for other things. But it doesn't scale well and backups with LevelDB are not really easy either (close DB, copy all files).
Message queues when they are ordered (at least on the local node/queue level) usually just need some kind of append-only log file. You don't do random reads or writes into the middle of the queue, you only modify the head and tail.
InfluxDB, albeit being a time series db has similar write patterns to a message queue, learned it the hard way when they first tried to use a LSM Tree database (LevelDB), then switched to a B+Tree (BoltDB/LMDB) but that also doesn't scale once the DB gets big and the tree has quite some depth. They kindly did a nice writeup of their journey: https://influxdb.com/docs/v0.9/concepts/storage_engine.html
Why not do it simple and use append-only files without complex structure and management?
Check out Kafka for a better storage format for message queues of this kind.
PS: every message queue should first clearly explain what guarantees it provides.