Hands-On Reactive Programming with Python
上QQ阅读APP看书,第一时间看更新

The create operator

The create operator is the operator most often used to create custom observables. The implementation of almost all other factory operators is done on top of this one. Its marble diagram is shown in the following figure:

Figure 4.11: The create operator

Its prototype is as follows:

Observable.create(subscribe)

The subscribe parameter is a function which will be called each time an observer subscribes to the observable. The prototype of the subscribe function is as follows:

subscribe(observer)

Its only argument is the observer that subscribed to the observable. The following code shows a simple way to use it:

def on_subscribe(observer):
observer.on_next(1)
observer.on_next(2)
observer.on_next(3)
observer.on_completed()

numbers = Observable.create(on_subscribe)
numbers.subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

The on_subscribe subscription function emits three items on the observer by calling its on_next method. Then it completes the observable by calling the on_completed method of the observer. This subscribe function is used to create the numbers observable. The subscription provides the following result:

item: 1
item: 2
item: 3
completed

The preceding example was very simple. Let's look at a more realistic example of a very common pattern of the create operator—implementing an observable that reacts from the items of another observable (in other words, an operator). The preceding example sums items from the source observable as long as they are even. Every time an odd number is received, the current sum is emitted on the output observable and its value is reset to the value of the odd number.

Let's start with the subscription to this custom operator, shown as follows:

numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

An observable of numbers is created. This observable is provided to the sum_even function, and the resulting observable is subscribed. The skeleton of the sum_even function is as follows:

def sum_even(source):
def on_subscribe(observer):
accumulator = 0
source.subscribe(on_next, on_error, on_completed)

return Observable.create(on_subscribe)

The preceding code just returns an observable, with the nested on_subscribe subscription function. The on_subscribe function initializes the sum accumulator to 0 and subscribes to the source observable. So, when an observer subscribes to the observable returned by sum_evenon_subscribe is called, and the source observable is also subscribed. This is a chain of subscriptions. Finally, the callbacks of the source observer must be implemented as nested functions of on_subscribe, as follows:

        def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i

def on_error(e):
observer.on_error()

def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()

The on_next implementation should be clear. The accumulator is updated with the sum of items when they are even and is reset when they are odd. The value of the accumulator is emitted every time an odd number is received. The error and completion of the source observable are propagated to observer of the output observable. The complete code is as follows:

def sum_even(source):
def on_subscribe(observer):
accumulator = 0
def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i

def on_error(e):
observer.on_error()

def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()

source.subscribe(on_next, on_error, on_completed)

return Observable.create(on_subscribe)

numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

The preceding code provides the following output:

item: 8
item: 7
completed

The two items received correspond to the sum of 2, 2, 4, and the sum of 5 and 2. The completion is correctly received after these two items.