Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

RxJava transfiguration

In this way, RxJava provides an additional module that allows us to easily convert one reactive type into another. Let's look at how to convert Observable<T> to Publisher<T> and adopt rx.Subscriber<T> to org.reactivestreams.Subscriber<T>.

Suppose we have an application that uses RxJava 1.x and Observable as the central communication type between components, as shown in the following example:

interface LogService {
Observable<String> stream();
}

However, with the publication of the Reactive Streams specification, we decide to follow the standard and abstract our interfaces from the following particular dependency:

interface LogService {
Publisher<String> stream();
}

As might be noticed, we easily replaced the Observable with the Publisher. However, the refactoring of the implementation may take much more time than just replacing the return type. Fortunately, we may always easily adapt an existing Observable to a Publisher, as shown in the following example:

class RxLogService implements LogService {                         // (1)
final HttpClient<...> rxClient = HttpClient.newClient(...); // (1.1)

@Override
public Publisher<String> stream() {
Observable<String> rxStream = rxClient.createGet("/logs") // (2)
.flatMap(...) //
.map(Utils::toString); //

return RxReactiveStreams.toPublisher(rxStream); // (3)
}
}

The key is as follows:

  1. This is the RxLogService class declaration. That class represents the old Rx-based implementation. At point (1.1) we use the RxNetty HttpClient, which allows interaction with external services in asynchronous, non-blocking fashion using a Netty Client wrapped into an RxJava based API.
  2. This is the external request execution. Here, using the created instance of HttpClient, we request the stream of logs from the external service, transforming the incoming elements into String instances.
  3. This is the rxStream adaption to the Publisher using the RxReactiveStreams library.

As might be noticed, the developers of RxJava took care of us and provided an additional RxReactiveStreams class, making it possible to convert  Observable into a Reactive Streams' Publisher. Moreover, with the appearance of the Reactive Streams specification, RxJava developers have also provided non-standardized support for backpressure, which allows a converted Observable to be compliant with the Reactive Streams specification. 

Along with the conversion of Observable to Publisher, we may also convert rx.Subscriber to org.reactivestreams.Subscriber. For example, streams of logs were previously stored in the file. For that purpose, we had the custom Subscriber, which was responsible for I/O interaction. In turn, the transfiguration of the code to migrate to Reactive Streams specification looks like the following: 

class RxFileService implements FileService {                       // (1)

@Override // (2)
public void writeTo( //
String file, //
Publisher<String> content //
) { //

AsyncFileSubscriber rxSubscriber = // (3)
new AsyncFileSubscriber(file); //

content // (4)
.subscribe(RxReactiveStreams.toSubscriber(rxSubscriber)); //
}
}

The key is as follows:

  1. This is the RxFileService class declaration.
  2. This is the writeTo method implementation which accepts the Publisher as the central type for interaction between components.
  3. This is the RxJava-based AsyncFileSubscriber instance declaration.
  4. This is the content subscription. To reuse the RxJava based Subscriber, we adapt it using the same RxReactiveStreams utility class.

As we can see from the preceding example, the RxReactiveStreams provide a broad list of converters, making it possible to convert a RxJava API to the Reactive Streams API. 

In the same way, any Publisher<T> may be converted back to RxJava Observable:

Publisher<String> publisher = ... 

RxReactiveStreams.toObservable(publisher)
.subscribe();

In general, RxJava started following the Reactive Streams specification in some way. Unfortunately, because of backward compatibility, implementing the specification was not possible, and there are no plans to extend the  Reactive Streams specification  for RxJava 1.x in the future. Moreover, starting from the 31st of March 2018, there are no plans to support RxJava 1.x anymore.

Fortunately, the second iteration of RxJava brings new hope. Dávid Karnok, the father of the second version of the library, significantly improved the overall library's design and introduced an additional type that is compliant with the Reactive Streams specification. Along with Observable, which is left unchanged because of backward compatibility, RxJava 2 offers the new reactive type called Flowable.

The Flowable reactive type gives the identical API as Observable but extends org.reactivestreams.Publisher from the beginning. As shown in the next example, in which it is incorporated with the fluent API, Flowable may be converted to any common RxJava types and back to a Reactive Streams-compatible type:

Flowable.just(1, 2, 3)
.map(String::valueOf)
.toObservable()
.toFlowable(BackpressureStrategy.ERROR)
.subscribe();

As we may notice, the conversion of the Flowable to the Observable is an uncomplicated application of one operator. However, to convert the Observable back to the Flowable, it is necessary to provide some of the available backpressure strategies. In RxJava 2, Observable was designed as the push-only stream. Consequently, it is crucial to keep the converted Observable compliant with the Reactive Streams specification.

BackpressureStrategy refers to the strategies that take place when the producer does not respect the consumer's demand. In other words, a BackpressureStrategy defines the behavior of the stream when we have a Fast producer and a slow consumer. As we might remember, at the beginning of the chapter, we covered identical cases and considered three central strategies. These strategies included unbounded buffering of elements, dropping elements on the overflow, or blocking the producer based on a lack of demand from the consumer. In general, BackapressureStrategy reflects all the described strategies in some way, except the strategy of blocking producers. It also provides strategies such as BackapressureStrategy.ERROR , which—upon a lack of demand— sends an error to the consumer and automatically  disconnects it . We will not go into detail on each strategy in this chapter but will cover them in Chapter 4,  Project Reactor - the Foundation for Reactive Apps.