Mastering Java for Data Science
上QQ阅读APP看书,第一时间看更新

Streaming API

Java 8 was a big step forward in the history of the Java language. Among other features, there were two important things--Streams and Lambda expressions.

In Java, a stream is a sequence of objects, and the Streams API provides functional-style operations to transform these sequences, such as map, filter, and reduce. The sources for streams can be anything that contain elements, for example, arrays, collections, or files.

For example, let's create a simple Word class, which contains a token and its part of speech:

public class Word { 
private final String token;
private final String pos;
// constructor and getters are omitted
}

For brevity, we will always omit constructors and getters for such data classes, but indicate that with a comment.

Now, let's consider a sentence My dog also likes eating sausage. Using this class, we can represent it as follows:

Word[] array = { new Word("My", "RPR"), new Word("dog", "NN"), 
new Word("also", "RB"), new Word("likes", "VB"),
new Word("eating", "VB"), new Word("sausage", "NN"),
new Word(".", ".") };

Here, we use the Penn Treebank POS notation, where NN represents a noun or VB represents a verb.

Now, we can convert this array to a stream using the Arrays.stream utility method:

Stream<Word> stream = Arrays.stream(array);

Streams can be created from collections using the stream method:

List<Word> list = Arrays.asList(array); 
Stream<Word> stream = list.stream();

The operations on streams are chained together and form nice and readable data processing pipelines. The most common operations on streams are the map and filter operations:

  • Map applies the same transformer function to each element
  • Filter, given a predicate function, filters out elements that do not satisfy it

At the end of the pipeline, you collect the results using a collector. The Collectors class provides several implementations such as toList, toSet, toMap, and others.

Suppose we want to keep only tokens which are nouns. With the Streams API, we can do it as follows:

List<String> nouns = list.stream() 
.filter(w -> "NN".equals(w.getPos()))
.map(Word::getToken)
.collect(Collectors.toList());
System.out.println(nouns);

Alternatively, we may want to check how many unique POS tags there are in the stream. For this, we can use the toSet collector:

Set<String> pos = list.stream() 
.map(Word::getPos)
.collect(Collectors.toSet());
System.out.println(pos);

When dealing with texts, we may sometimes want to join a sequence of strings together:

String rawSentence = list.stream() 
.map(Word::getToken)
.collect(Collectors.joining(" "));
System.out.println(rawSentence);

Alternatively, we can group words by their POS tag:

Map<String, List<Word>> groupByPos = list.stream() 
.collect(Collectors.groupingBy(Word::getPos));
System.out.println(groupByPos.get("VB"));
System.out.println(groupByPos.get("NN"));

Also, there is a useful toMap collector that can index a collection using some fields. For example, if we want to get a map from tokens to the Word objects, it can be achieved using the following code:

Map<String, Word> tokenToWord = list.stream() 
.collect(Collectors.toMap(Word::getToken, Function.identity()));
System.out.println(tokenToWord.get("sausage"));

Apart from object streams, the Streams API provides primitive streams--streams of ints, doubles, and other primitives. These streams have useful methods for statistical calculations such as sum, max, min, or average. A usual stream can be converted to a primitive stream using functions such as mapToInt or mapToDouble.

For example, this is how we can find the maximum length across all words in our sentence:

int maxTokenLength = list.stream() 
.mapToInt(w -> w.getToken().length())
.max().getAsInt();
System.out.println(maxTokenLength);

Stream operations are easy to parallelize; they are applied to each item separately, and therefore multiple threads can do that without interfering with one another. So, it is possible to make these operations a lot faster by splitting the work across multiple processors and execute all the tasks in parallel.

Java leverages that and provides an easy and expressive way to create parallel code; for collections, you just need to call the parallelStream  method:

int[] firstLengths = list.parallelStream() 
.filter(w -> w.getToken().length() % 2 == 0)
.map(Word::getToken)
.mapToInt(String::length)
.sequential()
.sorted()
.limit(2)
.toArray();
System.out.println(Arrays.toString(firstLengths));

In this example, the filtering and mapping is done in parallel, but then the stream is converted to a sequential stream, sorted, and the top two elements are extracted to an array. While the example is not very meaningful, it shows how much it is possible to do with streams.

Finally, the standard Java I/O library offers some convenience methods. For example, it is possible to represent a text file as a stream of lines using the Files.lines method:

Path path = Paths.get("text.txt"); 
try (Stream<String> lines = Files.lines(path, StandardCharsets.UTF_8)) {
double average = lines
.flatMap(line -> Arrays.stream(line.split(" ")))
.map(String::toLowerCase)
.mapToInt(String::length)
.average().getAsDouble();
System.out.println("average token length: " + average);
}

Streams are an expressive and powerful way to process data and mastering this API is very helpful for doing data science in Java. Later on, we will often use the Stream API, so you will see more examples of how to use it.