Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

Reactive Streams spec in action

In general, as we might notice from the previous section, even though the interfaces from the Reactive Streams specification are straightforward, the overall concept is complex enough. Thus, we are going to learn the central idea and conceptual behaviors of those three interfaces in an everyday example.

Let's consider as an example a news subscription and how this may become smarter with new Reactive Streams interfaces. Consider the following code for creating a Publisher for a news service:

NewsServicePublisher newsService = new NewsServicePublisher();

Now let’s create a Subscriber and subscribe it to the NewsService:

NewsServiceSubscriber subscriber = new NewsServiceSubscriber(5);
newsService.subscribe(subscriber);
...
subscriber.eventuallyReadDigest();

By calling subscribe() on the newsService instance, we show the desire to get the latest news. Usually, before sending any news digests, a high-quality service sends a congratulation letter with the information about the subscription and the subscription's cancellation. This action is absolutely identical to our Subscriber#onSubscribe() method, which informs the Subscriber about a successful subscription and gives them the ability to unsubscribe. Since our service follows the rules of the Reactive Streams specification, it allows the client to select as many news articles as it can read at once. Only after the client specifies the number of the first portion of digests by calling the Subscription#request does the news service starts sending digests over the Subscriber#onNext method, and the subscriber can then read the news.

Here, eventually means that in real life we may postpone reading the newsletter to the evening or the end of the week, which means that we manually check the inbox with the news. From the subscriber's perspective, that logic is implemented with the support of the  NewsServiceSubscriber#eventuallyReadDigests(). In general, such behavior means that the user's inbox collects news digests, and a usual service subscription model may easily overflow the subscribers' inboxes. In turn, what usually happens when the news service thoughtlessly sends messages to the subscriber and the subscriber does not read them is that the mail service provider puts the news service email address on the blacklist. Moreover, in that case, the Subscriber may miss an important digest. Even if this has not happened, the subscriber would not be happy due to the overflowing mailbox with a bunch of unread digests from the news service. Thus, to preserve the subscriber's happiness, the news service is required to provide a strategy for delivering the news. Suppose that the read state of the newsletter may acknowledge the service. Here, once we have ensured that all messages are read, we may provide some specific logic for sending a new news digest only when the previous one has been read. This mechanism may be easily implemented within the specification. The next piece of code exposes an example of the whole mentioned mechanism:

class NewsServiceSubscriber implements Subscriber<NewsLetter> {    // (1)
final Queue<NewsLetter> mailbox = new ConcurrentLinkedQueue<>();//
final int take; //
final AtomicInteger remaining = new AtomicInteger(); //
Subscription subscription; //

public NewsServiceSubscriber(int take) { ... } // (2)

public void onSubscribe(Subscription s) { // (3)
... //
subscription = s; //
subscription.request(take); // (3.1)
... //
} //

public void onNext(NewsLetter newsLetter) { // (4)
mailbox.offer(newsLetter); //
} //

public void onError(Throwable t) { ... } // (5)
public void onComplete() { ... } //

public Optional<NewsLetter> eventuallyReadDigest() { // (6)
NewsLetter letter = mailbox.poll(); // (6.1)
if (letter != null) { //
if (remaining.decrementAndGet() == 0) { // (6.2)
subscription.request(take); //
remaining.set(take); //
} //
return Optional.of(letter); // (6.3)
} //
return Optional.empty(); // (6.4)
} //
} //

The key is as follows:

  1. This is the NewsServiceSubscriber class declaration which implements Subscriber<NewsLetter>. Here, along with the plain class definition, we have the list of useful fields (such as the mailbox represented by a Queue, or the subscription field) which represents the current subscription; in other words, the agreement between the client and the news service.
  2. This is the NewsServiceSubscriber constructor declaration. Here, the constructor accepts one parameter called take which indicates the size of news digests that the user can potentially read at once or at the near time.
  3. This is the Subscriber#onSubscribe method implementation. Here, at point (3.1), along with storing the received Subscription, we send the earlier preferenced users' new reading throughput to the server.
  4. This is the Subscriber#onSubscribe method implementation. The entire logic of new digests handling is straightforward and is just a process of putting messages into the Queue mailbox.
  5. This is the Subscriber#onError and Subscriber#onComplete method declaration. Those methods are called on the subscription termination.
  6. This is the public eventuallyReadDigest method declaration. First of all, to indicate that the mailbox may be empty, we rely on the Optional. In turn, as the first step, at point (6.1), we try to get the latest unread news digest from the mailbox. If there are no available unread newsletters in the mailbox, we return an Optional.empty()(6.4). In the case in which there are available digests, we decrease the counter (6.2), which represents the number of unread messages that have previously been requested from the news service. If we are still waiting for some messages, we return the fulfilled Optional. Otherwise, we additionally request a new portion of digests and reset the counter of remaining new messages (6.3).

Due to the specification, the first call invokes onSubscribe(), which stores Subscription locally and then notifies Publisher about their readiness to receive newsletters via the request() method. In turn, when the first digest comes, it is stored in the queue for future reading, which is what usually happens in a real mailbox. After all, when the subscriber has already read all digests from the inbox, the Publisher is going to be notified of that fact and prepare a new portion of the news. In turn, if the news service changes the subscription policy—which in some cases means the completion of the current user subscriptions—then the subscriber is going to be notified about that via the onComplete method. The client will then be asked to accept a new policy and automatically resubscribe to the service. An example of when onError may be handled is (of course) an accidentally dropped database which holds the information about users' preferences. In that case, it might be counted as a failure, and the subscriber would then receive an excuse letter and be asked to resubscribe to the service with new preferences. Finally, the implementation of the eventuallyReadDigest is nothing more than a real user's actions such as opening the mailbox, checking new messages, reading letters and marking them as read, or just closing the mailbox when there is nothing new to interact with.

As we might see, Reactive Streams is naturally suitable for and solving problems of unrelated business cases, at first glance. Just by providing such a mechanism, we can keep our subscribers happy and not get into the blacklist of the mailbox provider.