|
C = A + B in Rx is just a combination of 2 observables. One way of doing that is with "combineLatest": A.combineLatest(B).map((a,b) => a + b)
Of course, you may want to cache the last emitted value for "a+b", or a default value in case no items were emitted yet, something that would be a hot multicast observable, roughly: A.combineLatest(B).map((a,b) => a + b)
.multicast(BehaviorSubject(default))
And the underlying framework can optimize for this use-case, with shortcuts and whatnot. In Scala, nothing would prevent you from having a sexy macro, akin to Scala-Async.All that matters is the underlying abstractions are Observable (the producer, characterized by its subscribe method), the Observer (the consumer, which is really a single function split in 3) and the communication protocol between them. I don't see that as a regression, I see it as the foundation - in the end, no matter what you make of behaviors, it's still a producer/consumer pattern. Actually my problem with the original Rx implementation is very different. I'm also working on an Rx redesigned implementation for Scala, with back-pressure baked in by design in the protocol [1]. This is because when events are passing asynchronous boundaries, you can have streams that are producing more data than consumers can process and (compared with other abstractions for processing streams of data, like Iteratees) the consumer doesn't signal demand back to the producer. And this issue becomes even more relevant when events are sent over the network. [1] https://github.com/monifu/monifu/wiki/Rx-Contract-and-Design... |