The introduction of the Processor notion
We have learned about the primary three interfaces that constitute the Reactive Streams specification. We have also seen how a proposed mechanism may improve the news service that sends news digests via email. However, at the beginning of this section, it was mentioned that there are four core interfaces in the specification. The last one is a combination of Publisher and Subscriber and is called the Processor. Let's take a look at the following implementation's code:
package org.reactivestreams;
public interface Processor<T, R> extends Subscriber<T>,
Publisher<R> {
}
In contrast to the Publisher and Subscriber, which are start and end points by definition, Processor is designed to add some processing stages between the Publisher and Subscriber. Since the Processor may represent some transformation logic, this makes streaming pipeline behaviors and business logic flows easier to understand. The shining example of the Processor's usage may be any business logic that may be described in a custom operator or it may be to provide additional caching of streaming data, and so on. To get a better understanding of the conceptual application of the Processor, let's consider how the NewsServicePublisher may be improved with the Processor interface.
The most uncomplicated logic that may be hidden behind the NewsServicePublisher is database access with newsletter preparation and subsequent multi-casting to all subscribers:
In this example, the NewsServicePublisher is split into four additional components:
- This is the DBPublisher stage. Here, Publisher is responsible for providing access to the database and returning the newest posts.
- This is the NewsPreparationOperator stage. This stage is an intermediate transformation that is responsible for aggregating all messages and then, when the completion signal is emitted from the main source, combining all news into the digest. Note that this operator always results in one element because of the aggregation nature. Aggregation assumes the presence of storage, which might be either in a queue or any other collection for storing received elements.
- This is the ScheduledPublisher stage. This stage is responsible for scheduling periodic tasks. In the previously mentioned case, a scheduled task is querying a database (DBPublisher), processing the result and merging received data to the downstream. Note that ScheduledPublisher is effectively an infinite stream and the completion of the merged Publisher is ignored. In the case of a lack of requests from the downstream, this Publisher throws an exception to the actual Subscriber through the Subscriber#onError method.
- This is the SmartMulticastProcessor stage. This Processor plays a vital role in the flow. First of all, it caches the latest digest. In turn, that stage supports multi-casting, which means that there is no need to create the same flow for each Subscriber separately. Also, SmartMulticastProcessor includes, as mentioned earlier, a smart mailing tracking mechanism, and only sends newsletters for those who have read the previous digest.
- These are the actual subscribers, which are effectively NewsServiceSubscriber.
In general, the preceding diagram shows what might be hidden behind the plain NewsServicePublisher. In turn, that example exposes the real application of the Processor. As might be noticed, we have three transformation stages, but only one of those is required to be the Processor.
First of all, in cases in which we need just a plain transformation from A to B, we do not need the interface that exposes the Publisher and Subscriber at the same time. The presence of the Subscriber interface means that once the Processor has subscribed to the upstream, the elements may start coming to the Subscriber#onNext method and may potentially be lost because of the absence of the downstream Subscriber. In turn, with such a technique, we have to bear in mind the fact that the Processor should be subscribed before it subscribes to the main Publisher.
Nevertheless, this over-complicates the business flow and does not allow us to create a reusable operator that fits any case with ease. Moreover, the construction of a Processor's implementation introduces an additional effort on independent (from the main Publisher) management of the Subscriber and proper backpressure implementation (for example, employing a queue if needed). Subsequently, that may cause degradation in performance or merely decrease the whole stream throughput due to an unreasonably complicated implementation of the Processor as a plain operator.
Since we know that we want to transform only A to B, we simply want to start the flow when the actual Subscriber calls Publisher#subscribe, and we do not want to over—complicate the internal implementation. Consequently, the composition of multiple Publisher instances—which accept upstream as the parameter to the constructor and simply provide the adapter logic—fits the requirements very well.
In turn, the Processor shines when we need to multicast elements, regardless of whether there are subscribers or not. It also allows some kind of mutation, since it implements the Subscriber interface, which effectively allows mutations such as caching.
Valuing the fact that we have already seen the implementation of the TakeFilterOperator operator and NewsServiceSubscriber, we may be sure that the internals of most instances of Publisher, Subscriber, and Processor are similar to the mentioned examples. Consequently, we do not get into the details of the internals of each class, and consider only the final composition of all components:
Publisher<Subscriber<NewsLetter>> newsLetterSubscribersStream =... // (1)
ScheduledPublisher<NewsLetter> scheduler = //
new ScheduledPublisher<>( //
() -> new NewsPreparationOperator(new DBPublisher(...), ...),// (1.1)
1, TimeUnit.DAYS //
); //
SmartMulticastProcessor processor = new SmartMulticastProcessor(); //
scheduler.subscribe(processor); // (2)
newsLetterSubscribersStream.subscribe(new Subscriber<>() { // (3)
... //
public void onNext(Subscriber<NewsLetter>> s) { //
processor.subscribe(s); // (3.1)
} //
... //
}); //
The key is as follows:
- This is the publishers, operator, and processor declaration. Here, newsLetterSubscribersStream represents the infinite stream of users who subscribe to the mailing list. In turn, at point (1.1) we declare Supplier<? extends Publisher<NewsLetter>>, which supplies the DBPublisher wrapped into the NewsPreparationOperator.
- This is the SmartMulticastProcessor to the ScheduledPublisher<NewsLetter> subscription. That action immediately starts the scheduler, which in turn subscribes to the inner Publisher.
- This is the newsLetterSubscribersStream subscription. Here we declare the anonymous class to implement the Subscriber. In turn, at point (3.1) we subscribe each new incoming Subscriber to the processor, which multi-casts the digest among all subscribers.
In this example, we have combined all processors in one chain, sequentially wrapping them into each other or making components subscribe to each other.
To summarize, we have covered the basics of the Reactive Streams standard. We have seen the transformation of the idea of reactive programming expressed in libraries such as RxJava to the standard set of interfaces. Along with that, we saw that the mentioned interfaces easily allow us to define an asynchronous and non-blocking interaction model between components within the system. Finally, when embracing the Reactive Streams specification, we are capable of building a reactive system not just on the high architecture level but at the level of smaller components as well.