Java 8 Streams API 详解

  • A+
所属分类:JAVA

当我第一次遇到Stream API时,内心是困惑的,因为我立马就会联想到Java I/O的InputStream and OutputStream。然而,实际上Java 8 的Stream API是完全不同的事物,它为Java 提供函数式编程方面发挥了重要作用。

函数式编程有一个重要概念,叫做Monad。推荐阅读阮一峰大师的相关博客<<图解 Monad>>

本篇主要讲解如何使用Java 8 Stream API以及其相关的各种操作。例如对一个数据流的顺序操作,以及强大的流操作reducecollectflatMap,当然还要深入研究一下涉及到程序运行性能的并行流(parallel streams)。

初探Streams

Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

流操作分为中间操作和终端操作,并组合成流管道。流管道由源(例如集合,数组,生成器函数或I / O通道)组成;然后是零个或多个中间操作,例如Stream.filter或Stream.map;最后是一个终端操作,如Stream.forEach或Stream.reduce。在上述例子中filtermapsorted是中间操作,而forEach是一个终端的操作。有关所有可用流操作的完整列表,请参阅Stream Javadoc

中间操作进一步分为无状态操作和有状态操作。无状态操作(例如过滤器和映射)在处理新元素时不保留先前看到的元素的状态 - 每个元素都可以独立于其他元素上的操作进行处理。有状态操作(例如,distinct和sorted)可以在处理新元素时包含先前看到的元素的状态。

终端操作(例如Stream.forEach或IntStream.sum)可以遍历流以产生结果或副作用。在执行终端操作之后,流管道被认为已消耗,并且不能再使用;如果需要再次遍历同一数据源,则必须返回数据源以获取新流。

各种类型的流

可以从各种数据源(尤其是集合)创建流。Lists and Sets支持新方法stream()parallelStream()创建顺序流或并行流。并行流能够在多个线程上运行,本教程的后续部分将对此进行介绍。我们现在专注于顺序流:

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

在对象列表上调用stream()方法将返回常规对象流。当然我们也可以不创建集合就可以使用流计算,如下面的代码示例中看到的那样:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

只需使用 Stream.of()方法从一堆对象引用中创建一个流。

除了常规对象流之外,Java 8还附带了特殊类型的流操作类,用于处理原始数据类型intlong以及double。你可能已经猜到了IntStreamLongStream而且DoubleStream

IntStreams可以使用IntStream.range()方法替换常规for循环:

IntStream.range(1, 4)
    .forEach(System.out::println);

// 1
// 2
// 3

所有这些原始流都像常规对象流一样工作,但有以下不同之处:原始流使用专门的lambda表达式,例如IntFunction代替FunctionIntPredicate代替Predicate。原始流支持额外的终端聚合操作,例如sum()average()

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0

有时将常规对象流转换为基本流是有用的,反之亦然。为此,对象流支持特殊的映射操作,例如mapToInt()mapToLong()mapToDouble

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  // 3

可以通过mapToObj()方法将原始流转换为对象流:

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

下面是一个组合示例:双精度流首先映射到int流,然后映射到字符串类型的对象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

流操作的执行流程

现在我们已经学会了如何创建和使用不同类型的流,让我们深入了解流操作的内部执行流程。

中间操作的一个重要特征是懒惰。如下是缺少终端操作的示例:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

执行此代码段时,不会向控制台打印任何内容。这是因为只有存在终端操作时才执行中间操作。

让我们通过终端操作forEach扩展上面的例子:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

执行此代码段,会在控制台上产生的输出如下所示:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

请注意结果输出的顺序!是否感到惊讶?!可能有些人会以为流中所有的元素一起沿着各个操作水平移动,然而实际上,每个元素都沿着链垂直移动。也就是说,只有流中的第一个元素d2通过filterforEach后,第二个元素a2才会开始被处理。

这种机制可以减少对每个元素执行的实际操作数,如下一个示例所示:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map:      d2
// anyMatch: D2
// map:      a2
// anyMatch: A2

anyMatch 操作中,匹配以A开头的元素。由于每个元素都沿着链垂直移动,map在这种情况下只需要执行两次。因此,map操作得到了尽可能少的调用次数。

执行链的顺序很重要

下面的示例包括两个中间操作mapfilter,以及一个终端操作forEach。让我们再次检查这些操作是如何执行的:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

As you might have guessed both map and filter are called five times for every string in the underlying collection whereas forEach is only called once.

你可能已经猜到了结果,map并且filter对于底层集合中的每个字符串调用了五次,而forEach只调用一次。

如果我们改变操作的顺序,移动filter到链的开头,我们可以大大减少实际的执行次数:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1
// filter:  b3
// filter:  c

现在,map只调用一次,因此操作管道对大量输入元素的执行速度要快得多。在编写复杂的方法链时要记住这一点。

让我们通过一个额外的sorted操作来扩展上面的例子:

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

排序是一种特殊的中间操作。也被叫做有状态操作,因为为了在排序期间必须维护元素集合的状态。

执行此示例将产生以下控制台输出:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

首先,对整个输入集合执行排序操作。换句话说,sorted是水平执行的。因此,在这种情况下,sorted方法对输入集合中的每个元素的多个组合总共调用了8次。

我们可以通过重新排序执行链来优化性能:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2

在此示例中,sorted从未调用过,因为filter将输入集合减少到只有一个元素。因此,对于较大的输入集合,性能会大大提高。

流不可重复使用

Java 8流无法重用。只要您调用任何终端操作,流就会关闭:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

对同一个流结果调用anyMatch后,再调用noneMatch方法,就会很遗憾的得到异常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

为了克服这个限制,我们必须为我们想要执行的每个终端操作创建一个新的流链,例如我们可以创建一个流提供者来构建一个新的流,其中已经设置了中间操作filter

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

每次调用get()构造一个我们保存的新流,以调用所需的终端操作。

高级操作

Streams支持大量不同的操作。我们已经了解了最重要的操作,如filtermap。我并不会亲自带你研究其他所有可用的基本操作(参见Stream Javadoc),相反,让我们深入研究更复杂的操作collectflatMapreduce

本节中的大多数代码示例使用以下人员列表进行演示:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect

Collect是一个非常有用的终端操作,将流的元素转变成其他不同的结果,例如一个ListSetMap。Collect方法接受CollectorCollector包含四种不同操作的操作:供应者(supplier)累加器(accumulator)组合器(combiner)修整器(finisher)。这听起来非常复杂,但好消息是Java 8通过Collectors类支持各种内置收集器。因此,对于最常见的操作,您不必自己实现收集器。

让我们从一个常见的用例开始:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]

正如你所看到的,从流的元素构造列表非常简单。需要一个集合而不是列表 - 只需使用Collectors.toSet()

下面示例按年龄对所有人进行分组:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors非常多才多艺。还可以在流的元素上进行聚合操作,例如,确定所有人的平均年龄:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0

如果您对更全面的统计信息感兴趣,汇总收集器将返回一个特殊的内置摘要统计信息对象。因此,我们可以简单地确定人的最小,最大和平均年龄,以及年龄总和,并统计参与计算的元素数量。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下一个示例将所有人连接成一个字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

如上代码所示,联接收集器接受分隔符以及可选的前缀和后缀。

为了将流元素转换为映射,我们必须指定如何映射键和值。请记住,映射的键必须是唯一的,否则抛出IllegalStateException异常。当然您可以选择将合并函数作为附加参数以绕过异常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

现在我们知道了一些最强大的内置收集器,让我们尝试构建我们自己的特殊收集器。我们希望将流的所有人转换为单个字符串,该字符串由|字符分隔的大写字母组成。为了实现这一目标,我们创建了一个新的收集器Collector.of()。我们必须通过收集器的四个要素:供应者累加器组合器修整器

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

由于Java中的字符串是不可变的,我们需要一个帮助类StringJoiner,让收集器构造我们的字符串。供应者最初使用适当的分隔符构造这样的StringJoiner。累加器用于将每个人的大写名称添加到StringJoiner。组合器知道如何将两个StringJoiners合并为一个。在最后一步中,整理器从StringJoiner构造所需的String。

FlatMap

我们已经学会了如何利用该map操作将流的对象转换为另一种类型的对象。Map有点受限,因为每个对象只能映射到另一个对象。但是,如果我们想将一个对象转换为多个其他对象或根本不转换,该怎么办?flatMap为解决此问题而生。

FlatMap将流的每个元素转换为其他对象的流。因此,每个对象将被转换为由流支持的零个,一个或多个其他对象。然后将这些流的内容放入返回的flatMap操作流中。

在我们看到flatMap实际操作之前,我们需要一个适当的类型层

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

接下来,我们利用有关流的知识来实例化几个对象:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

Now we have a list of three foos each consisting of three bars.

现在我们有一个列表,列表中有三个foo,每个foo由三个bars组成。

FlatMap接受一个必须返回对象流的函数。所以为了解决每个foo的bar对象,我们只需传递相应的函数:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

如你所见,我们已成功将三个foo对象的流转换为九个bar对象的流。

最后,上面的代码示例可以简化为流操作的单个管道:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

FlatMap也可用于Java 8中引入的类OptionalflatMap操作返回optional类型的对象。所以它可以用来防止讨厌的null检查。

想想这样一个高度分层的结构:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

为了解析foo外部实例的内部字符串,您必须添加多个空值检查以防止可能的NullPointerExceptions:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

利用flatMap操作可以获得相同的行为:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

每次调用flatMap返回一个Optional包装所需对象(如果存在)或null(不存在)。

Reduce

reduce操作将流的所有元素组合成单个结果。

Java 8支持三种不同的reduce方法。

  • 第一种是将元素流简化为流的一个元素。让我们看看我们如何使用这种方法来确定年龄最大的人:
persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

reduce方法接受BinaryOperator累加器函数。 这实际上是一个BiFunction,其中两个操作数共享相同的类型,在上面示例代码中共享的是Person类型。 BiFunction类似于Function但接受两个参数。 示例函数比较两个人的年龄,以便返回具有最大年龄的人。

  • 第二种reduce方法接受标识值和BinaryOperator累加器。此方法可用于构造一个新的Person,其中包含来自流中所有其他人的聚合名称和年龄:
Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
  • 第三种reduce方法接受三个参数:标识值,BiFunction累加器和BinaryOperator类型的组合函数。 由于标识值类型不限于Person类型,我们可以利用此reduce来确定所有人的年龄总和:
Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76

如你所见,结果是76,但是究竟发生了什么?让我们通过一些调试输出扩展上面的代码:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

结果显示,累加器功能完成了所有工作。首先使用初始标识值0和第一个人Max调用累加器。在接下来的三个步骤sum中,总和不断增加最后一步人的年龄,直至总年龄为76岁。

等一下!组合器永远不会被调用?

接下来,并行执行相同的流将解除秘密:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

并行执行此流会导致完全不同的执行行为。现在实际上调用了组合器。由于并行调用累加器,因此需要组合器来对单独的累加值求和。

让我们在下一章深入探讨并行流。

并行流

流可以并行执行,以增加大量输入元素的运行时性能。并行流ForkJoinPool通过静态方法ForkJoinPool.commonPool()使用公共可用的流。底层线程池最多使用五个线程 - 具体取决于可用物理CPU核心的数量:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

通过设置以下JVM参数可以减小或增加此值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

集合支持parallelStream()方法来创建并行元素流。或者,您可以在给定流上调用中间方法parallel(),以将顺序流转换为并行流。

为了低估并行流的并行执行行为,下一个示例将当前线程的信息打印到控制台:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

根据这些调试的输出结果,我们应该更好地了解哪些线程实际用于执行流操作:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

如你所见,并行流利用公共中的所有可用线程ForkJoinPool来执行流操作。多次运行的结果可能不同,因为实际使用的线程是非确定性的。

让我们通过一个额外的sort操作扩展该示例:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

乍一看执行结果,看起来很奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

似乎sort只在主线程上顺序执行。实际上,sort在并行流上使用新的Java 8方法Arrays.parallelSort()。其运行机制在Javadoc中对此进行了详细的阐述:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.(如果指定数组的长度小于最小粒度,则使用Arrays.sort方法对其进行排序。)

回到上一节reduce的例子。我们已经发现组合器函数只是在并行计算的时候被调用。让我们看看实际涉及哪些线程:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

控制台输出结果显示累加器组合器函数在所有可用线程上并行执行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

总之,可以说并行流可以为具有大量输入元素的流带来良好的性能提升。但请记住,某些并行流操作,比如reducecollect需要额外的计算(组合操作),这在顺序执行时是不需要的。

此外,我们了解到所有并行流操作共享VM内相同的J的ForkJoinPool。因此,尽量避免执行阻塞流的操作,因为这会减慢应用程序中其他依赖并行流的部分。

Happy coding!

  • 我的微信
  • 这是我的微信扫一扫
  • weinxin
  • 我的微信公众号
  • 我的微信公众号扫一扫
  • weinxin