Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

Flux

Let's describe how data flows through the Flux class with the following marble diagram:

Diagram 4.2 An example of the Flux stream transformed into another Flux stream

Flux defines a usual reactive stream that can produce zero, one, or many elements; even potentially an infinite amount of elements. It has the following formula:

onNext x 0..N [onError | onComplete]

It is not very common to work with infinite data containers in the imperative world, but it is pretty common with functional programming. The following code may produce a simple endless Reactive Stream:

Flux.range(1, 5).repeat()

This stream repeatedly produces numbers from 1 to 5 (the sequence would look like—1, 2, 3, 4, 5, 1, 2,...). This is not a problem and it will not blow up the memory as each element can be transformed and consumed without the need to finish creating the whole stream. Furthermore, the subscriber can cancel the subscription at any time and effectively transform an endless stream into a finite stream.

Beware: an attempt to collect all elements emitted by an endless stream may cause an OutOfMemoryException. It is not recommended to do so in production applications, but the simplest way to reproduce such behavior may be with the following code:

Flux.range(1, 100)                                                  // (1)
.repeat() // (2)
.collectList() // (3) .block(); // (4)

In the preceding code, we do the following:

  1. The range operator creates a sequence of integers starting from 1 up to 100 (inclusive).
  2. The repeat operator subscribes to the source reactive stream again and again after the source stream finishes. So, the repeat operator subscribes to the results of the stream operator, receives elements 1 to 100 and the onComplete signal, and then subscribes again, receives elements 1 to 100, and so on, without stopping.
  1. With the collectList operator, we are trying to gather all produced elements into a single list. Of course, because the repeat operator generates an endless stream, elements arrive and increase the size of the list so it consumes all the memory and causes the application to fail with the following error—java.lang.OutOfMemoryError: Java heap space. Our application has just run out of free heap memory.
  2. The block operator triggers an actual subscription and blocks the running thread until the final result arrives, which, in the current case, cannot happen as the reactive stream is endless.