The Apex operator model
Now that we have seen an example of a custom operator implementation, let's have a closer look at the Apex operator life cycle interfaces. The operators that are specified in the logical DAG will be deployed into the execution layer container (worker process) before they can do actual work. The Apex engine defines an execution life cycle for operators with corresponding interfaces and callback methods—the operator API. Operators that can be used in Apex applications have to be implemented in a way that conforms to the operator API. There are many prebuilt and ready to use operators in the Apex library, but the user isn't limited to those, as we have just seen (in the previous example) with the custom operator example.
The following diagram illustrates the operator life cycle after deployment in a container:
In the case of an input operator (which has no port to receive incoming events), the engine will call emitTuples() instead of the process() method on the port. Let's have look at the inpidual callbacks in more detail:
- setup: This is a one-time call when the operator gets deployed. This is the opportunity to perform initialization work such as creation of caches, connections to external systems, and so on. This method should not perform actual data processing work. It is for preparation only.
- beginWindow: This marks the start of a streaming window (the Apex engine's processing time slice of a stream that was discussed in Chapter 1, Introduction to Apex and not to be confused with event time windowing). This is a repeating callback, as the operator is now in processing mode and can therefore also emit results on output ports or write data to external systems. The most typical work done in beginWindow is the resetting of transient state that lasts for the duration of the interval. It is also possible to emit tuples during this callback, such as for recovery logic that requires replay of previously emitted tuples from a log.
- emitTuples (input operator): This is periodically called by the engine, it provides an opportunity for the operator to emit data tuples to an output stream (which transports them for further processing to downstream operators). As an example, the Kafka connector emits tuples received from the consumer API to the output port. The engine will call emitTuples as many times as possible within the streaming window interval. Therefore, it is recommended that the operator code return control to the engine as fast as possible and does not block. In fact, most of the connectors that read from external systems do that in a separate IO thread (managed in setup/teardown) that will place available data into a holding buffer from which emitTuples will only transfer whatever is available up to a limit before returning control.
- InputPort.process: For operators other than the input operators we just discussed, this is the entry point to the processing logic when new data is available on the stream. The engine will pass inpidual tuples to process (one at a time) and that can repeat many times within a streaming window. The type of tuples is that of the stream and the user's schema, it can be any Java object (POJO, Map, and so on). Examples of processing logic are simple stateless operation such as filtering that immediately emits a result as seen in the earlier example or stateful transformation, such as counting where accumulation occurs within the operator and aggregate results emitted later. Just like for emitTuples, the operator logic should not block and return control as fast as possible to not interfere with the overall processing in the DAG.
- endWindow: When a streaming window interval has lapsed, the engine will call this method. This is an opportunity to perform operations on data that would be too expensive to do inpidually such as database commit, flushing files. It can also be used to emit an aggregate result, such as a count per interval or update metric fields in an operator.
As a side note, there is an attribute, APPLICATION_WINDOW_COUNT, for the application developer to control the frequency of beginWindow and endWindow callbacks in the form of multiples of streaming window intervals. It won't be further discussed here, as the recommended way to optimize resources consistent with fault tolerant state management is the CheckpointNotificationListener.
- teardown: This should perform the opposite of setup. Like setup, teardown isn't part of the data processing flow and should not be used to affect any related state. For example, it would be invalid to commit to the database in teardown. There is also no guarantee that this method will be called. When the process terminates unexpectedly the engine may not be able to perform the callback and logic in the operator should be written with that expectation. For example, it is ideal and best effort to close files or connections, but it may not be possible and the operator needs to be prepared for it. An example would be a file lease in HDFS that will prevent from a replacement operator to write to the same file, making it necessary to use intermediate file name until they are final.