Pull versus push
Finally, to understand the problem described in the previous section, we have to go back in history and analyze the initial interaction model between a source and its subscribers.
During the early period of the whole reactive landscape evolution, all libraries were designed with the thought that data is pushed from the source to the subscriber. That decision was made because a pure pull model is not efficient enough in some cases. An example of this is when communication over the network appeared in the system with network boundaries. Suppose that we filter a huge list of data but take only the first ten elements from it. By embracing the PULL model for solving such a problem, we are left with the following code:
final AsyncDatabaseClient dbClient = ... // (1)
public CompletionStage<Queue<Item>> list(int count) { // (2)
BlockingQueue<Item> storage = new ArrayBlockingQueue<>(count); //
CompletableFuture<Queue<Item>> result //
= new CompletableFuture<>(); //
//
pull("1", storage, result, count); // (2.1)
//
return result; //
} //
void pull( // (3)
String elementId, //
Queue<Item> queue, //
CompletableFuture resultFuture, //
int count //
) { //
dbClient.getNextAfterId(elementId) //
.thenAccept(item -> { //
if (isValid(item)) { // (3.1)
queue.offer(item); //
//
if (queue.size() == count) { // (3.2)
resultFuture.complete(queue); //
return; //
} //
} //
//
pull(item.getId(), // (3.3)
queue, //
resultFuture, //
count); //
}); //
} //
The annotated code is again explained as follows:
- This is the AsyncDatabaseClient field declaration. Here, using that client, we wire the asynchronous, non-blocking communication with the external database.
- This is the list method declaration. Here we declare an asynchronous contract by returning CompletionStage as the result of the calling of the list method. In turn, to aggregate the pulling results and asynchronously sent it to the caller, we declare Queue and CompletableFuture to store received values and then manually send the collected Queue later. Here, at point (2.1) we start the first call of the pull method.
- This is the pull method declaration. Inside that method, we callAsyncDatabaseClient#getNextAfterId to execute the query and asynchronously receive the result. Then when the result is received, we filter it at point (3.1). In the case of the valid item, we aggregate it into the queue. Additionally, at point (3.2), we check whether we collected enough elements, send them to the caller, and exit pulling. If either of the mentioned if branches has been bypassed, we recursively call the pull method again (3.3).
As may be noticed from the preceding code, we use an asynchronous, non-blocking interaction between the service and the database. At first glance, there is nothing wrong here. However, if we look at the following diagram, we see the gap:
As might be noticed from the preceding diagram, asking for the next element one by one results in extra time spent on the request's delivery from Service to Database. From the service perspective, most of the overall processing time is wasted in the idle state. Even if the resources are not used there, the overall processing time is doubled or even tripled because of the additional network activity. Moreover, the database is not aware of the number of future requests, which means that the database cannot generate data in advance and is therefore in the idle state. It means that the database is waiting for a new request and is inefficient while the response is being delivered to the service and the service is processing the incoming response and then asking for a new portion of data .
To optimize the overall execution and keep the pulling model as the first class citizen, we may combine pulling with batching, as shown in the following modification of the central example:
void pull( // (1)
String elementId, //
Queue<Item> queue, //
CompletableFuture resultFuture, //
int count //
) { //
dbClient.getNextBatchAfterId(elementId, count) // (2)
.thenAccept(items -> { //
for(Item item : items) { // (2.1)
if (isValid(item)) { //
queue.offer(item); //
//
if (queue.size() == count) { //
resultFuture.complete(queue); //
return; //
} //
} //
} //
pull(items.get(items.size() - 1) // (3)
.getId(), //
queue, //
resultFuture, //
count); //
}); //
}
Again, the following key explains the code:
- This is the same pull method declaration as in the previous example.
- This is the getNextBatchAfterId execution. As may be noticed, the AsyncDatabaseClient method allows asking for a specific number of elements, which are returned as the List<Item>. In turn, when the data is available, they are processed in almost the same way, except an additional for-loop is created to process each element of the batch separately (2.1).
- This is the recursive pull method execution, which is designed to retrieve an additional batch of items in the case of a lack of items from the previous pulling.
On one hand, by asking for a batch of elements we may significantly improve the performance of the list method execution and reduce the overall processing time. On the other hand, there are still some gaps in the interaction model, which might be detected by analyzing the following diagram:
As we may notice, we still have some inefficiency in the processing time. For example, the client is still idle while the database is querying the data. In turn, sending a batch of elements takes a bit more time than sending just one. Finally, an additional request for the whole batch of elements may be effectively redundant. For instance, if only one element remains to finish the processing and the first element from the next batch satisfies the validation, then the rest of the items are going to be skipped and are totally redundant.
To provide the final optimization, we may ask for data once, and the source pushes them asynchronously when they become available. The following modification of the code shows how that might be achieved:
public Observable<Item> list(int count) { // (1)
return dbClient.getStreamOfItems() // (2)
.filter(item -> isValid(item)) // (2.1)
.take(count) // (2.2)
} //
The annotations are as follows:
- This is the list method declaration. Here, the Observable<Item> return type identifies that elements are being pushed.
- This is the querying the stream stage. By calling the AsyncDatabaseClient#getStreamOfItems method, we subscribe to the database once. Here, at point (2.1) we filter elements and, by using the operator, .take() takes a specific amount of data, as requested by the caller.
Here, we use RxJava 1.x classes as first-class citizens to receive the pushed elements. In turn, once all requirements are met, the cancellation signal is sent, and connection to the database is closed. The current interaction flow is depicted in the following diagram:
In the preceding diagram, the overall processing time is optimized again. During the interaction, we have only one big idle when the service is waiting for the first response. After the first element has arrived, the database starts sending subsequent elements as and when they come. In turn, even if processing may be a bit faster than querying the next element, the overall idle of the service is short. However, the database may still generate excess elements that are ignored by the service once the required number of elements have been collected.