Hacker News new | ask | show | jobs
by thamer 2053 days ago
I'm using Apache Arrow to store CSV-like data and it works amazingly well.

The datasets I work with contains a few billion records with 5-10 fields each, almost all 64-bit longs; I originally started with CSV and soon switched to a binary format with fixed-sized records (mmap'd) which gave great performance improvements, but the flexibility, size gains due to columnar compression and the much greater performance of Arrow for queries that span a single column or a small number of them won me over.

For anyone who has to process even a few million records locally, I would highly recommend it.

2 comments

Arrow + Parquet is brilliant!

Right now I'm writing tools in Python (Python!) to analyse several 100TB datasets in S3. Each dataset is made up of 1000+ 6GB parquet files (tables UNLOADed from AWS Redshift db). Parquet's columnar compression gives a 15x reduction in on-disk size. Parquet also stores chunk metadata at the end of each file, allowing reads to skip over most data that isn't relevant.

And once in memory, the Arrow format gives zero-copy compatibility with Numpy and Pandas.

If you try this with Python, make sure you use the latest 2.0.0 version of PyArrow [1]. Two other interesting libraries for manipulating PyArrow Tables and ChunkedArrays are fletcher [2] and graphique[3].

[1] I use: conda install -c conda-forge pyarrow python-snappy

[2] https://github.com/xhochy/fletcher

[3] https://github.com/coady/graphique

Yeah, Parquet is awesome. One of the things we really want to do here is to push DataFusion (the Rust based SQL execution engine) to work on Parquet files, but to push down predicates and other things and operate on the data while it's compressed.

You pay such a high overhead marshalling that data into an Arrow RecordBatch. Best thing ever is to work with the Parquet file and not even decompress the chunks that you don't need. Of course, this assumes that you're writing summary statistics as part of the metadata, which we plan to do.

There's rudimentary statistics support, but I've found that it accounts for a lot of the write time (I wrote some tests last weekend, I can ping your team when I put them on GH).

Improving our stats writing could yield a lot of benefits. I'll open JIRAs for this in the next few days.

Parquet + Arrow reminds me of a fast SQL engine on data lake called Dremio.

https://www.dremio.com/webinars/apache-arrow-calcite-parquet...

They also have an OSS version in GitHub.

They're heavily into Arrow. A few years ago they contributed Gandiva, an LLVM expression compiler for super fast processing. https://arrow.apache.org/blog/2018/12/05/gandiva-donation/

It's one of the reasons I like being all in on Arrow. Why do everything ourselves when a ton of other smart people are working on this too?

Talking about Gandiva, something that's open for taking: https://issues.apache.org/jira/browse/ARROW-5315 (creating Gandiva bindings for Rust).

I think DataFusion is mature enough that we could plug in Gandiva into it.

Disclaimer: I work on the Arrow Rust implementation

Ah yes, of course, hi Nevi :). Thank you again for all your work on the Rust implementation. We're obviously big fans.

Gandiva bindings is definitely something we should look into, but I'm guessing there's much lower hanging fruit within DataFusion in terms of optimizing, particularly for our use case.

Thanks Paul :)

I think with compute functions/kernels, we're sitting under a grapevine, so we'll be able to add a lot to Arrow without yet needing Gandiva bindings. The Rust stdsimd [0] work will also enable us to better use SIMD in ~ a year from now (I hope)

[0] https://github.com/rust-lang/stdsimd

I am confused about Parquet wrt to the rest of your stack. Is it just that Parquet happens to be the Redshift export format? Or are you actually using Arrow and Parquet at the same time in some manner?
Parquet is the persistent storage format for when the data is written to disk or object storage. Arrow is the in-memory format and a set of tools built around working with it.

So InfluxDB IOx will use both, Arrow in-memory for fast access, and Parquet on storage for persistence.

Ok, I didn't realize that Arrow was in-memory and not also on-disk. As serializing between the two didn't make sense to me. I would have thought that Arrow would also be an on-disk format (mmap'd) so that there would be little to no conversion losses.

Why convert to an on-disk format (Parquet) and not save the in-memory representation to storage directly?

Parquet is awesome, but it's sad their Rust parquet crate only works on nightly rustc, which is a no-no in many cases. Not sure why they chose to go that way (well, because of trait specialization, technically speaking - which is not landing and not going to land anytime soon).
There are contributors and committers in the Arrow project working to resolve this. We recently removed specialization from the core Arrow crate and we plan on doing the same for the Parquet crate.
Yes, thanks a million for doing that! I've read the 'despecialization' of arrow-rust PR which actually seemed to end up being quite simple (but it's a breaking change, so a 3.0 candidate?)

Hope that something similar can be done in parquet so that arrow-parquet-df can all compile on stable in 3.0, that would unlock it for many potential users.

Thanks for sharing graphique, that is very neat.
Did you have to implement your-format to Arrow conversion /abstraction layer manually or was it already available? Could you give out some pointers on how to roll out own binary-format-to-arrow query engine? Why didn't you use parquet for storage?
Sorry, I didn't check back on this comment after posting it, I hope you'll see this.

It's manual. What you get with Arrow is an efficient way to store structured data in a way that values for the same column (same dimension) are together on disk rather than having each record with all its fields together. So if you're storing say a dataset of users with a 64-bit user ID, an IP address, a timestamp, and a country code you'd define a Schema object as having these 4 columns with the size of each one (here 64/32/32/16 bits for example) and then you'd start writing your records block by block. A block is just a set of records and Arrow will mark the start and end of each block. Up to you to decide when to start and end a block, I use 100k entries per block but haven't played much with different values.

In pseudo-code it'd be something like this when reading just the user IDs:

    VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
    BigIntVector userIdVector = (BigIntVector) root.getVector("user_id"); // gets this 64-bit dimension from the schema
    // ... more vectors defined, one for each dimension
    List<ArrowBlock> blocks = arrowReader.getRecordBlocks();
    for (ArrowBlock block : blocks) { // go over all blocks
        arrowReader.loadRecordBatch(block); // *actually* reads the block
        for (int i = 0; i < block.getRowCount(); i++) {
            long userId = userIdVector.get(i);  // offset is within the current block
            processUserId(userId);
        }
    }
The code in this example will only go over the user IDs, and will read them very quickly. So yes, you have to implement any sort of querying capabilities yourself. In my case it was simple set of queries like "get distribution of dimension X" where X can be a parameter, or "filter records where X < minX || X > maxX", also with parameters, etc. Just a handful in all.

For a limited set of queries and not something like full SQL, this was perfect. I found this article very useful to get started: https://github.com/animeshtrivedi/blog/blob/master/post/2017...

InfluxDB IOx will use Parquet files for persistent storage