Implementing business logic
The TemperatureSensor class previously sent events to a Spring ApplicationEventPublisher, but now it should return a reactive stream with Temperature events. The Reactive implementation of TemperatureSensor may look like the following:
@Component // (1)
public class TemperatureSensor {
private final Random rnd = new Random(); // (2)
private final Observable<Temperature> dataStream = // (3)
Observable
.range(0, Integer.MAX_VALUE) // (4)
.concatMap(tick -> Observable // (5)
.just(tick) // (6)
.delay(rnd.nextInt(5000), MILLISECONDS) // (7)
.map(tickValue -> this.probe())) // (8)
.publish() // (9)
.refCount(); // (10)
private Temperature probe() {
return new Temperature(16 + rnd.nextGaussian() * 10); // (11)
}
public Observable<Temperature> temperatureStream() { // (12)
return dataStream;
}
}
Here, we register the TemperatureSensor as a Spring bean by applying the @Component annotation (1), so this bean can be autowired into other beans. The TemperatureSensor implementation uses a RxJava API that was not previously explained in detail. Nevertheless, we are trying to clarify the used transformation by exploring the class logic.
Our sensor holds the random number generator rnd to simulate actual hardware sensor measurements (2). In a statement, (3), we define a private field called dataStream, which is returned by the public method temperatureStream() (12). Thus, dataStream is the only Observable stream defined by the component. This stream generates an effectively endless flow of numbers (4) by applying the factory method range(0, Integer.MAX_VALUE). The range() method generates a sequence of integers starting from 0 that have Integer.MAX_VALUE elements. For each of these values, we apply the transformation (5)—concatMap(tick -> ...). The method concatMap() receives a function, f, that transforms an tick item into an observable stream of elements, applies the f function to each element of the incoming stream, and joins the resulting streams one by one. In our case, the f function makes a sensor measurement after a random delay (to match the behavior of the previous implementation). To probe a sensor, we create a new stream with only one element tick (6). To simulate a random delay, we apply the delay(rnd.nextInt(5000), MILLISECONDS) (7) 0perator, which shifts elements forward in time.
For the next step, we probe the sensor and retrieve a temperature value by applying the map(tickValue -> this.probe())) transformation (8), which in turn calls the probe() method with the same data generation logic as before (11). In that case, we ignore the tickValue, as it was required only to generate a one-element stream. So, after applying the concatMap(tick -> ...), we have a stream that returns sensor values with a random interval of up to five seconds between emitted elements.
Actually, we could return a stream without applying operators (9) and (10), but in that case, each subscriber (SSE client) would trigger a new subscription for the stream and a new sequence of sensor readings. This means that sensor readings would not be shared among subscribers that could lead to hardware overload and degradation. To prevent this, we use the publish() (9) operator, which broadcasts events from a source stream to all destination streams. The publish() operator returns a special kind of Observable called ConnectableObservable. The latter provides the refCount() (10) operator, which creates a subscription to the incoming shared stream only when there is at least one outgoing subscription. In contrast with the Publisher-Subscriber implementation, this one makes it possible not to probe the sensor when nobody listens.