Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

A composition of reactive technologies in action

To learn more about the technologies' composability, let's try to combine several reactive libraries in one Spring Framework 4-based application. In turn, our application is based on the revisited news service functionality with access to it via a plain REST endpoint. This endpoint is responsible for looking up news from the database and external services:

Diagram 3.13. Cross Library communication example inside one application

The preceding diagram introduces three reactive libraries to our system. Here, we use Ratpack as a web server. With the TransfromablePublisher, this allows us to easily combine and process results from several sources. In turn, one of the sources is MongoDB, which  returns the FindPublisher as the result of querying. Finally, here we have access to the external new service and grab a portion of data using the RxNetty HTTP client, which returns the Observable and is adapted to the org.reactivestreams.Publisher as a result.

To summarize, we have four components in the system, the first of which is Spring Framework 4. The second is Retrofit, which plays the role of the web framework. Finally, the third and fourth are RxNetty and MongoDB, for providing access to the news. We are not going into too much detail on the implementation of the components responsible for communication with the external services, but we are going to cover the implementation of the endpoint instead. This highlights the value of the Reactive Streams specification as the standard for the composability of the independent frameworks and libraries: 

@SpringBootApplication                                             // (1)
@EnableRatpack // (1.1)
public class NewsServiceApp { //

@Bean // (2)
MongoClient mongoClient(MongoProperties properties) { ... } // (2.1)
@Bean //
DatabaseNewsService databaseNews() { ... } // (2.2)
@Bean //
HttpNewsService externalNews() { ... } // (2.3)

@Bean // (3)
public Action<Chain> home() { //
return chain -> chain.get(ctx -> { // (3.1)

FindPublisher<News> databasePublisher = // (4)
databaseNews().lookupNews(); //
Observable<News> httpNewsObservable = //
externalNews().retrieveNews(); //
TransformablePublisher<News> stream = Streams.merge( // (4.1)
databasePublisher, //
RxReactiveStreams.toPublisher(httpNewsObservable) //

); //

ctx.render( // (5)
stream.toList() //
.map(Jackson::json) // (5.1)
); //
}) //
} //

public static void main(String[] args) { // (6)
SpringApplication.run(NewsServiceApp.class, args); //
} //
}

The key is as follows:

  1. This is the NewsServiceApp class declaration. This class is annotated with the @SpringBootApplication annotation, which assumes the usage of Spring Boot features. In turn, there is an additional @EnableRatpack annotation at point (1.1) which is part of the ratpack-spring-boot module and enables auto-configuration for the Ratpack server.
  2. This is the common beans declaration. Here, at point (2.1) we configure the MongoClient bean. At points (2.2) and (2.3) there are configurations of services for news retrieval and lookup. 
  3. This is the request's handler declaration. Here, to create a Ratpack request handler, we have to declare a Bean with the Action<Chain> type, which allows providing the configuration of the handler at point (3.1)
  4. This is the services invocation and results aggregation. Here we execute the services' methods and merge the returned streams using Ratpack Streams API (4.1).
  5. This is the rendering of the merged streams stage. Here, we asynchronously reduce all the elements into a list and then transform that list to the specific rendering view such as JSON (5.1).
  6. This is the main method's implementation. Here we use a common technique for bringing the Spring Boot application to life.

The preceding example shows the power of the Reactive Streams standard in action. Here, using an API of several unrelated libraries, we may easily build one processing flow and return the result to the final user without there being any additional effort for adapting one library to the another. The only exclusion from that rule is HttpNewsService, which in the result of the retrieveNews method execution returns the Observable. Nevertheless, as we might remember, RxReactiveStreams offers us a list of useful methods, allowing us to easily convert the RxJava 1.x Observable to the Publisher.