Hacker News new | ask | show | jobs
by robertnishihara 693 days ago
I'm one of the creators of Ray. A few thoughts :)

1. This is truly impressive work from AWS. Patrick Ames began speaking about this a couple years ago, though at this point the blog post is probably the best reference. https://www.youtube.com/watch?v=h7svj_oAY14

2. This is not a "typical" Ray use case. I'm not aware of any other exabyte scale data processing workloads. Our bread and butter is ML workloads: training, inference, and unstructured data processing.

3. We have a data processing library called Ray Data for ingesting and processing data, often done in conjunction with training and inference. However, I believe in this particular use case, the heavy lifting is largely done with Ray's core APIs (tasks & actors), which are lower level and more flexible, which makes sense for highly custom use cases. Most Ray users use the Ray libraries (train, data, serve), but power users often use the Ray core APIs.

4. Since people often ask about data processing with Ray and Spark, Spark use cases tend to be more geared toward structured data and CPU processing. If you are joining a bunch of tables together or running SQL queries, Spark is going to be way better. If you're working with unstructured data (images, text, video, audio, etc), need mixed CPU & GPU compute, are doing deep learning and running inference, etc, then Ray is going to be much better.

4 comments

I'm just learning about this tool now and had a brief question if you have the time:

The paper mentions support for zero-copy intranode object sharing which links to serialization in the Ray docs - https://docs.ray.io/en/latest/ray-core/objects/serialization...

I'm really curious how this is performant - I recently tried building a pipeline that leveraged substantial multiprocessing in Python, and found that my process was bottlenecked by the serialization/deserialization that occurs during Python multiprocessing. Would love any reading or explanation you can provide as to how this doesn't also bottleneck a process in Ray, since it seems that data transferred between workers and nodes will need to serialized and deserialized.

Thanks in advance! Really cool tool, hopefully I'll be able to use it sooner rather than later.

Your right that the serialization / deserialization overhead can quickly exceed the compute time. To avoid this you have to get a lot of small things right. And given our focus on ML workloads, this is particularly important when sharing large numerical arrays between processes (especially processes running on the same node).

One of the key things is to make sure the serialized object is stored in a data format where the serialized object does not need to be "transformed" in order to access it. For example, a numpy array can be created in O(1) time from a serialized blob by initializing a Python object with the right shape and dtype and a pointer to the right offset in the serialized blob. We also use projects like Apache Arrow that put a lot of care into this.

Example in more detail:

Imagine the object you are passing from process A to process B is a 1GB numpy array of floats. In the serialization step, process A produces a serialized blob of bytes that is basically just the 1GB numpy array plus a little bit of metadata. Process A writes that serialized blob into shared memory. This step of "writing into shared memory" still involves O(N) work, where N is the size of the array (though you can have multiple threads do the memcpy in parallel and be limited just by memory bandwidth).

In the deserialization step, process B accesses the same shared memory blob (process A and B are on the same machine). It reads a tiny bit of metadata to figure out the type of the serialized object and shape and so on. Then it constructs a numpy array with the correct shape and type and with a pointer to the actual data in shared memory at the right offset. Therefore it doesn't need to touch all of the bytes of data, it just does O(1) work instead of O(N).

That's the basic idea. You can imagine generalizing this beyond numpy arrays, but it's most effective for objects that include large numerical data (e.g., objects that include numpy arrays).

There are a bunch of little details to get right, e.g., serializing directly into shared memory instead of creating a serialized copy in process A and then copying it into shared memory. Doing the write into shared memory in parallel with a bunch of threads. Getting the deserialization right. You also have to make sure that the starting addresses of the numpy arrays are 64-byte aligned (if memory serves) so that you don't accidentally trigger a copy later on.

EDIT: I edited the above to add more detail.

This is probably a naive question, but how do two processes share address space? mmap?
Yeah, mmap, I think this is the relevant line [1].

Fun fact, very early on, we used to create one mmapped file per serialized object, but that very quickly broke down.

Then we switched to mmapping one large file at the start and storing all of the serialized objects in that file. But then as objects get allocated and deallocated, you need to manage the memory inside of that mmapped file, and we just repurposed a malloc implementation to handle that.

[1] https://github.com/ray-project/ray/blob/21202f6ddc3ceaf74fbc...

Super cool to see you here.

I've also looked at ray for running data pipelines before (at much much smaller scales) for the reasons you suggest (unstructured data, mixed CPU/GPU compute).

One thing I've wanted is an incremental computation framework (i.e., salsa [1]) built on ray so that I can write jobs that transparently reuse intermediate results from an object store if their dependents haven't changed.

Do you know if anyone has thought of building something like this?

[1] https://github.com/salsa-rs/salsa

I asked the same question to one of the core devs at a recent event and he (1) said that some people in finance have done related things and (2) suggested using the Ray slack to connect with developers and power users who might have helpful advice.

I agree this is a very interesting area to consider Ray for. There are lots of projects/products that provide core components that could be used but there’s no widely used library. It feels like one is overdue.

Other folks have built data processing libraries on top of Ray: Modin and Daft come to mind.

But I'm not aware of anything exactly like what you're referring to!

Curious if you know how well Ray works with multithreaded python libraries? For example, when using jax with ray, I have to ensure the import ordering imports ray first, as forking a threaded process leads to deadlocks in Python. Do you know how to ensure that ray handles forking the python interpreter correctly?
Multi-threaded libraries (e.g., numpy and PyTorch on CPUs come to mind) are well supported. In scenarios where many processes are each running heavily multi-threaded computations, it can help to pin specific processes to specific cores (e.g., using tools like psutil) to avoid contention.

The scenario where a Ray task forks is probably not very well supported. You can certainly start a subprocess from within a Ray task, but I think forking could easily cause issues.

You can definitely use Ray + Jax, but you probably need to avoid forking a process within a Ray worker.

> this is not a typical ray use case

Must be good enough if you're willing to dogfood it though?

To clarify, what I mean is that working with "exabytes" is atypical. Most use cases are at a slightly smaller scale :)

Data processing workloads are quite common on Ray, especially with unstructured data.

Also, I work on Ray, which is the underlying framework used here, but all the work in the post was done by the Amazon team.