查看原文
其他

用 Java 实现 Stream 高效混排与 Spliterator

ImportNew ImportNew 2019-10-04

(给ImportNew加星标,提高Java技能)


编译: 唐尤华 

链接: dzone.com/articles/a-case-study-of-implementing-an-efficient-shufflin>


对 Stream 执行排序操作只要调用排序 API 就好了,要实现相反的效果(混排)却并不简单。


本文介绍了如何使用 Java Stream `Collectors` 工厂方法与自定义 `Spliterator` 对 Stream 进行 Shuffle(混排),支持 Eager 与 Lazy 两种模式。


1. Eager Shuffle Collector


Heinz [在这篇文章][1]中给出了一种解决方案:将整个 Stream 转换为 list,对 list 执行 `Collections#shuffle`,再转为 Stream。像下面这样封装成一个复合操作:


[1]:https://www.javaspecialists.eu/archive/Issue258.html


```java
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
return Collectors.collectingAndThen(
toList(),
list -> {
Collections.shuffle(list);
return list.stream();
});
}
```


这种方法适用于对 Steam 中所有元素进行混排。由于会提前对集合中所有元素进行 Shuffle,如果只处理其中一部分则效果不佳,极端情况比如 Stream 只包含1个元素。


让我们来看看一个简单基准测试的运行结果:


```java
@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {
private List<String> source;

@Param({"1", "10", "100", "1000", "10000", "10000"})
public int limit;

@Param({"100000"})
public int size;

@Setup(Level.Iteration)
public void setUp() {
source = IntStream.range(0, size)
.boxed()
.map(Object::toString)
.collect(Collectors.toList());
}

@Benchmark
public List<String> eager() {
return source.stream()
.collect(toEagerShuffledStream())
.limit(limit)
.collect(Collectors.toList());
}
```


```shell
(limit) Mode Cnt Score Error Units
eager 1 thrpt 5 467.796 ± 9.074 ops/s
eager 10 thrpt 5 467.694 ± 17.166 ops/s
eager 100 thrpt 5 459.765 ± 8.048 ops/s
eager 1000 thrpt 5 467.934 ± 43.095 ops/s
eager 10000 thrpt 5 449.471 ± 5.549 ops/s
eager 100000 thrpt 5 331.111 ± 5.626 ops/s
```


从上面的数据可以看出,尽管运行结果 Stream 中元素不断增加,运行效果还是相当不错。因此,对整个集合提前混排太浪费了,尤其是元素较少的时候得分很差。


让我们看看来有什么好办法。


2. Lazy Shuffle Collector


为了节省 CPU 资源,与其对集合中所有元素预处理,不如根据需要只处理其中一部分。


为了达到这个效果,需要自定义一个 Spliterator 对所有对元素随机遍历,然后通过 `StreamSupport.stream` 构造创建一个 Stream 对象:


```java
public class RandomSpliterator<T> implements Spliterator<T> {
// ...
public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
return Collectors.collectingAndThen(
toList(),
list -> StreamSupport.stream(
new ShuffledSpliterator<>(list), false));
}
}
```


3. 实现细节


即使只取出一个随机元素,也不能避免计算整个 Steam 中的元素(这意味着不支持无限序列)。因此,可以用 `List<T>` 初始化 `RandomSpliterator<T>`。“注意,这里有一个陷阱”。


如果给定 `List` 不支持在常量时间内完成随机访问,这种方案要比 Eager 方案慢得多。为了避免这种情况,可以在实例化 `Spliterator` 的时候进行简单检查:


```java
private RandomSpliterator(
List<T> source, Supplier<? extends Random> random)
{
if (source.isEmpty()) { ... } // throw
this.source = source instanceof RandomAccess
? source
: new ArrayList<>(source);
this.random = random.get();
}
```


相比随机访问时间复杂度不是 O(1) 的实现,创建 `ArrayList` 的成本可以忽略不计。


现在重写最重要的 `tryAdvance()` 方法。实现很简单,每次迭代都从 `source` 集合中随机挑选并删除一个元素。


不必担心 `source` 发生改变。这里不发布 `RandomSpliterator`,只返回基于它的一个 `Collector`:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
int remaining = source.size();
if (remaining > 0 ) {
action.accept(source.remove(random.nextInt(remaining)));
return true;
} else {
return false;
}
}
```


除此之外,还需要实现其它3个方法:


```java
@Override
public Spliterator<T> trySplit() {
return null; // 表示 split 可不行
}

@Override
public long estimateSize()
{
return source.size();
}

@Override
public int characteristics()
{
return SIZED;
}
```


现在检查一下是否有效果:


```java
IntStream.range(0, 10).boxed()
.collect(toLazyShuffledStream())
.forEach(System.out::println);
```


结果如下:


```shell
3
4
8
1
7
6
5
0
2
9
```


4. 性能考虑


在这个实现中,我们把大小为 N 的数组换成 M 查找或删除:


  • N:集合大小

  • M:挑选元素的数量


从 `ArrayList` 中查找或删除单个元素通常比交换开销大,因此方案的可扩展性不够好。但是对于 M 值较小的时候性能会好很多。


现在对比 Eager 方案(都包含100000个对象):


```shell
(limit) Mode Cnt Score Error Units
eager 1 thrpt 5 467.796 ± 9.074 ops/s
eager 10 thrpt 5 467.694 ± 17.166 ops/s
eager 100 thrpt 5 459.765 ± 8.048 ops/s
eager 1000 thrpt 5 467.934 ± 43.095 ops/s
eager 10000 thrpt 5 449.471 ± 5.549 ops/s
eager 100000 thrpt 5 331.111 ± 5.626 ops/s
lazy 1 thrpt 5 1530.763 ± 72.096 ops/s
lazy 10 thrpt 5 1462.305 ± 23.860 ops/s
lazy 100 thrpt 5 823.212 ± 119.771 ops/s
lazy 1000 thrpt 5 166.786 ± 16.306 ops/s
lazy 10000 thrpt 5 19.475 ± 4.052 ops/s
lazy 100000 thrpt 5 4.097 ± 0.416 ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart.png)


可以明显看到,如果数据流元素较少,新方案的性能优于前者。但随着“处理数量/集合大小”增加,吞吐量急剧下降。


这是因为从 `ArrayList` 中移除元素会带来额外开销,每次移除都会调用 `System#arraycopy` 对内部数组执行移位操作,开销较大。


对于较大的集合(1000000个元素)可以看到类似的模式:


```shell
(limit) (size) Mode Cnt Score Err Units
eager 1 10000000 thrpt 5 0.915 ops/s
eager 10 10000000 thrpt 5 0.783 ops/s
eager 100 10000000 thrpt 5 0.965 ops/s
eager 1000 10000000 thrpt 5 0.936 ops/s
eager 10000 10000000 thrpt 5 0.860 ops/s
lazy 1 10000000 thrpt 5 4.338 ops/s
lazy 10 10000000 thrpt 5 3.149 ops/s
lazy 100 10000000 thrpt 5 2.060 ops/s
lazy 1000 10000000 thrpt 5 0.370 ops/s
lazy 10000 10000000 thrpt 5 0.05 ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-2.png)


在更小集合(128个元素)上的表现:


```shell
(limit) (size) Mode Cnt Score Error Units
eager 2 128 thrpt 5 246439.459 ops/s
eager 4 128 thrpt 5 333866.936 ops/s
eager 8 128 thrpt 5 340296.188 ops/s
eager 16 128 thrpt 5 345533.673 ops/s
eager 32 128 thrpt 5 231725.156 ops/s
eager 64 128 thrpt 5 314324.265 ops/s
eager 128 128 thrpt 5 270451.992 ops/s
lazy 2 128 thrpt 5 765989.718 ops/s
lazy 4 128 thrpt 5 659421.041 ops/s
lazy 8 128 thrpt 5 652685.515 ops/s
lazy 16 128 thrpt 5 470346.570 ops/s
lazy 32 128 thrpt 5 324174.691 ops/s
lazy 64 128 thrpt 5 186472.090 ops/s
lazy 128 128 thrpt 5 108105.699 ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-3.png)


能不能进一步优化?


5. 进一步提高性能


不幸的是,现有的解决方案扩展性不尽如人意,让我们试着改进。但在此之前,先对现有操作进行测评:


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.36.58.png)


不出意外,`Arraylist#remove` 是开销最大的操作之一。换句话说,从 `ArrayList` 中删除元素耗费了大量 CPU 资源。


为什么呢?从 `ArrayList` 中删除元素会对底层实现的数组执行移除操作。问题是,Java 数组不会自动调整大小,每次移除都会创建一个更小的新数组:


```java
private void fastRemove(Object[] es, int i)
{
modCount++;
final int newSize;
if ((newSize = size - 1) > i)
System.arraycopy(es, i + 1, es, i, newSize - i);
es[size = newSize] = null;
}
```


接下来该怎么办?避免从 `ArrayList` 中移除元素。


为了达到这个效果,可以用一个数组存储剩余的元素并记录它的大小:


```java
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
private final Random random;
private final T[] source;
private int size;
private ImprovedRandomSpliterator(
List<T> source, Supplier<? extends Random> random)
{
if (source.isEmpty()) {
throw new IllegalArgumentException(...);
}
this.source = (T[]) source.toArray();
this.random = random.get();
this.size = this.source.length;
}
}
```


幸运的是,由于 `Spliterator` 的实例不会在线程之间共享,因此不会遇到并发问题。


现在尝试移除元素时,实际上不需要创建缩小后的新数组。相反,只要减小 `size` 并忽略数组的其余部分即可。


在此之前,把最后一个元素与返回的元素交换:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (size > 0) {
int nextIdx = random.nextInt(size);
int lastIdx = size - 1;
action.accept(source[nextIdx]);
source[nextIdx] = source[lastIdx];
source[lastIdx] = null; // let object be GCed
size--;
return true;
} else {
return false;
}
}
```


对改进后的方案进行评测,可以看到开销最大的调用已经消失了:


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.38.47.png)


准备在此运行基准测试进行比较:


```shell
(limit) (size) Mode Cnt Score Error Units
eager 1 100000 thrpt 3 456.811 ± 20.585 ops/s
eager 10 100000 thrpt 3 469.635 ± 23.281 ops/s
eager 100 100000 thrpt 3 466.486 ± 68.820 ops/s
eager 1000 100000 thrpt 3 454.459 ± 13.103 ops/s
eager 10000 100000 thrpt 3 443.640 ± 96.929 ops/s
eager 100000 100000 thrpt 3 335.134 ± 21.944 ops/s
lazy 1 100000 thrpt 3 1587.536 ± 389.128 ops/s
lazy 10 100000 thrpt 3 1452.855 ± 406.879 ops/s
lazy 100 100000 thrpt 3 814.978 ± 242.077 ops/s
lazy 1000 100000 thrpt 3 167.825 ± 129.559 ops/s
lazy 10000 100000 thrpt 3 19.782 ± 8.596 ops/s
lazy 100000 100000 thrpt 3 3.970 ± 0.408 ops/s
lazy_improved 1 100000 thrpt 3 1509.264 ± 170.423 ops/s
lazy_improved 10 100000 thrpt 3 1512.150 ± 143.927 ops/s
lazy_improved 100 100000 thrpt 3 1463.093 ± 593.370 ops/s
lazy_improved 1000 100000 thrpt 3 1451.007 ± 58.948 ops/s
lazy_improved 10000 100000 thrpt 3 1148.581 ± 232.218 ops/s
lazy_improved 100000 100000 thrpt 3 383.022 ± 97.082 ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-5.png)


从上面的结果可以看出,改进后的方案性能受元素数量变化影响显著减小。


实际上,即使遇到最差情况,改进方案的性能也比基于 `Collections#shuffle` 的方案略好一些。


6. 完整示例


完整示例可以在 [GitHub][2] 上找到。


[2]:https://github.com/pivovarit/articles/tree/master/java-random-stream


```java
package com.pivovarit.stream;
import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
private final Random random;
private final T[] source;
private int size;
ImprovedRandomSpliterator(List<T> source, Supplier<? extends Random> random) {
if (source.isEmpty()) {
throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
}
this.source = (T[]) source.toArray();
this.random = random.get();
this.size = this.source.length;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (size > 0) {
int nextIdx = random.nextInt(size);
int lastIdx = size - 1;
action.accept(source[nextIdx]);
source[nextIdx] = source[lastIdx];
source[lastIdx] = null; // let object be GCed
size--;
return true;
} else {
return false;
}
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize()
{
return source.length;
}
@Override
public int characteristics()
{
return SIZED;
}
}
```


```java
package com.pivovarit.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toCollection;
public final class RandomCollectors {

private RandomCollectors() {
}

public static <T> Collector<T, ?, Stream<T>> toImprovedLazyShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> !list.isEmpty()
? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
: Stream.empty());
}

public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> !list.isEmpty()
? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
: Stream.empty());
}

public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
return Collectors.collectingAndThen(
toCollection(ArrayList::new),
list -> {
Collections.shuffle(list);
return list.stream();
});
}
}
```


推荐阅读

(点击标题可跳转阅读)

深入理解 Java Stream 流水线

Java Stream API 入门篇

Java Stream API 进阶篇


看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

喜欢就点一下「好看」呗~

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存