Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

Project Reactor version 1.x

When working on the Reactive Streams specification, developers from the Spring Framework team needed a high-throughput data processing framework, especially for the Spring XD project, the goal of which was to simplify the development of big data applications. To fulfill that need, the Spring team initiated a new project. From the beginning, it was designed with support for asynchronous, non-blocking processing. The team called it Project Reactor. Essentially, Reactor version 1.x incorporated best practices for message processing, such as the Reactor Pattern, and functional and reactive programming styles.

The Reactor Pattern is a behavioral pattern that helps with asynchronous event handling and synchronous processing. This means that all events are enqueued and the actual processing of an event happens later by a separate thread. An event is dispatched to all interested parties (event handlers) and processed synchronously. To learn more about the Reactor Pattern, please visit the following link: http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf.

By embracing these techniques, Project Reactor version 1.x gives us the ability to write concise code such as the following:

Environment env = new Environment();                               // (1)
Reactor reactor = Reactors.reactor()                               // (2)
                          .env(env)                                //
                          .dispatcher(Environment.RING_BUFFER)     // (2.1)
                          .get();                                  //

reactor.on($("channel"),                                           // (3)
event -> System.out.println(event.getData())); //
Executors.newSingleThreadScheduledExecutor() // (4) .scheduleAtFixedRate( // () -> reactor.notify("channel", Event.wrap("test")), // 0, 100, TimeUnit.MILLISECONDS // ); //

In the preceding code, there are a couple of conceptual points:

  1. Here, we create an Environment instance. The Environment instance is an execution context, which is responsible for creating a particular Dispatcher. This may potentially provide different kinds of dispatchers, ranging from an interprocess dispatcher to distributed ones.
  2. An instance of Reactor is created, which is a direct implementation of the Reactor Pattern. In the preceding example code, we use the Reactors class, which is a fluent builder for concrete Reactor instances. At point (2.1)we use the predefined implementation of Dispatcher based on the RingBuffer structure. To learn more about the internals and overall design of RingBuffer-based Dispatcher, please visit the following link: https://martinfowler.com/articles/lmax.html.
  3. Here, the declaration of a channel Selector and an Event consumer occurs. At this point, we register an event handler (in this case, a lambda that prints all received events to System.out). The filtering of events happens using the string selector, which indicates the name of the event channel. Selectors.$ provides a broader selection of criteria, so a final expression for event selection may be more complicated.
  4. Here, we configure the producer of the Event in the form of a scheduled task. At that point, we use the possibilities of Java's ScheduledExecutorService to schedule periodic tasks that send Event to a specific channel in the previously instantiated Reactor instance.

Under the hood, events are processed by Dispatcher and then sent to destination points. Depending on the Dispatcher implementation, an event may be processed synchronously or asynchronously. This provides a functional decomposition and generally works in a similar way to the Spring Framework event processing approach. Furthermore, Reactor 1.x provides a bunch of helpful wrappers that allow us to compose events' processing with a clear flow:

...                                                                // (1)
Stream<String> stream = Streams.on(reactor, $("channel"));         // (2)
stream.map(s -> "Hello world " + s)                                // (3)
      .distinct()                                                  //
      .filter((Predicate<String>) s -> s.length() > 2)             //
      .consume(System.out::println);                               // (3.1)

Deferred<String, Stream<String>> input = Streams.defer(env);       // (4)

Stream<String> compose = input.compose()                           // (5)
compose.map(m -> m + " Hello World")                               // (6)
.filter(m -> m.contains("1")) // .map(Event::wrap) //
.consume(reactor.prepare("channel")); // (6.1) for (int i = 0; i < 1000; i++) { // (7) input.accept(UUID.randomUUID().toString()); // } //

Let's break down the preceding code:

  1. At this point we have an Environment and a Reactor creation, as in the previous example.
  2. Here we have Stream creation. Stream allows the building of functional transformation chains. By applying the Streams.on method to Reactor with a specified Selector, we receive a Stream object attached to the specified channel in the given Reactor instance.
  3. Here, the processing flow is created. We apply a few intermediate operations, such as map, filter, and consume. The last of these is a terminal operator (3.1).
  4. Here, a Deferred Stream is created. The Deferred class is a special wrapper that makes it possible to provide manual events to the Stream. In our case, the Stream.defer method creates an additional instance of the Reactor class.
  5. At this point we have a Stream instance creation. Here, we retrieve Stream from the Deferred instance by using the compose method on it.
  1. At this point we have a reactive processing flow creation. This part of the pipeline composition is similar to what we have at point (3). At point (6.1), we use the Reactor API shortcut for the code, as follows—e -> reactor.notify("channel", e)
  2. Here, we supply a random element to the Deferred instance.

In the preceding example, we subscribe to the channel and then process all incoming events step by step. In contrast, in that example, we use the reactive programming technique to build a declarative processing flow. Here, we provide two separate processing stages. Furthermore, the code looks like the well-known RxJava API, making it more familiar to RxJava users. At some point, Reactor 1.x had good integration with the Spring Framework. Along with the message processing library, Reactor 1.x provides a bunch of add-ons, such as the add-on for Netty.

To summarize, at that time, Reactor 1.x was good enough at processing events at high speed. With excellent integration with the Spring Framework and composition with Netty, it made it possible to develop high-performance systems that provide asynchronous and non-blocking message processing.

However, Reactor 1.x also has its disadvantages. First of all, the library has no backpressure control. Unfortunately, the event-driven implementation of Reactor 1.x did not offer a way to control backpressure other than blocking the producer thread or skipping events. Furthermore, error handling was quite complicated. Reactor 1.x provides several ways of handling errors and failures. Even though Reactor 1.x was rough around the edges, it was used by the popular Grails web framework. Of course, this significantly influenced the next iteration of the reactive library.