Java Stream和Collection比较:何时以及如何从Java API返回Stream而不是集合Collection? - TomaszKiełbowicz

解道jdon 2021-05-04 17:27:04
java 比较 stream Collection


向您展示一些可以非常方便地使用Java Stream流的场景以及如何使用它们的示例。

本文基于标准Java库java.util.stream。它既与反应流无关,也与诸如Vavr之类的其他流实现无关。另外,我将不介绍诸如并行执行之类的流的高级细节。

首先,让我们简要讨论与集合相比独特的流功能。尽管存在一些相似之处,但差异是很大的,您不应将流仅视为库中的另一种集合。

根据java.util.stream 的文档,最重要的功能是:

  • 没有存储空间,可能是无限制的 -集合是现成的数据结构,而流表示产生数据的能力,通常在创建流时甚至不存在。由于不存储流中的数据,因此我们可以创建几乎不确定的流,或者可以更实际地对其重新措辞,我们可以让消费者决定要从流中读取多少个元素,从生产者的角度来看,它可能是不确定的(例如new Random().ints())。
  • 懒惰加载 —在定义流时暂停许多操作(例如过滤,映射),并且仅在使用者决定使用流中的数据时才执行
  • 本质上是实用的 -由于您已经具有使用流的经验,因此您可能会注意到处理流中的数据是为每个步骤(例如过滤器或映射)创建新流,而不是修改源数据
  • 消耗性 -您只能读取一次流,然后与可以多次读取的集合不同,它变为“消耗性”

现在让我们看看我们可以用流解决什么问题。

处理大量数据

假设,我们必须将数据从外部服务复制到我们的数据库中。要复制的数据量可以任意大。我们无法获取所有数据,无法将其存储在一个集合中,然后保存在数据库中,因为这可能会耗尽堆内存。我们必须分批处理数据,并设计外部服务客户端和数据库存储之间的接口。由于流不存储日期,因此可以使用它安全地处理所需的数据量。

在示例(及以下所有示例)中,我们将使用java.util.stream.Stream接口的静态方法来构建流。用Java构建流的最强大,最灵活的方法是实现Spliterator接口,然后使用StreamSupport类将其包装为流。但是,正如我们所看到的,Stream在许多情况下,接口中的静态工厂方法就足够了。

假定一个简单的API从支持分页的外部服务(例如,REST服务,数据库)中获取数据。该API最多可limit从提取项目offset。迭代地使用API​​,我们可以根据需要获取尽可能多的数据。

interface ExternalService {
 List<String> fetch(int offset, int limit);
}

现在,我们可以使用API​​提供数据流,并将API的使用者与分页API隔离开:

class Service<T> {
  private final ExternalService<T> externalService;
  
  public Stream<T> stream(int size, int batchSize) {
    var cursor = new Cursor();
    return Stream
      .generate(() -> next(cursor, size, batchSize))
      .takeWhile(not(List::isEmpty))
      .flatMap(List::stream);
  }
  private List<T> next(Cursor cursor, int size, int batchSize) {
    var fetchSize = Math.min(size - cursor.offset, batchSize);
    var result = externalService.fetch(cursor.offset, fetchSize);
    cursor.inc(result.size());
    return result;
  }
}

Cursor 握有当前偏移量offset:

private static class Cursor {
  private int offset;
   
  void inc(int by) {
    offset += by;
  }
}

我们使用Stream.generate()方法构建无限流,其中每个元素由流提供者创建。流元素是从REST API获取的页面List<T>。将为每个流创建Cursor类的实例,以跟踪获取的元素的进度。

Stream.takeWhile()方法用于检测的最后一页,最后返回的数据流T,而不是List<T>。

我们使用flatMap扁平化流。尽管在某些情况下,保留批处理(例如将整个页面保存在一个事务中)可能很有用。

现在,我们可以使用Service.stream(size, batchSize)来检索任意长流,而无需任何分页API的知识(我们决定公开batchSize参数,但这是一个设计决策)。在任何时间点,内存消耗都受到批处理大小的限制。使用者可以一一处理数据,将其保存在数据库中,或者再次进行批处理(批处理大小可能不同)。

快速访问(不完整)数据

假设我们有一个耗时的操作,必须对数据的每个元素执行该操作,并且计算要花费时间t。对于n元素,使用者必须等待t * n才能接收到计算结果。例如,如果用户正在等待带有计算结果的表,则可能是一个问题。我们希望在显示第一结果时立即显示它们,而不是等待所有结果的计算并立即提交表。

public class Producer1 {
  private Stream<String> buildStream() {
    return Stream.of("a", "b", "c");
  }
  
  private String expensiveStringDoubler(String input) {
    return input + input;
  }
  public Stream<String> stream() {
    return buildStream().map(this::expensiveComputation);
  }
}

消费者:

stream().forEach(System.out::println)

输出:

Processing of: a
aa
Processing of: b
…

输出:

Processing of: a
aa
Processing of: b
…

如我们所见,在开始处理下一个元素之前,用户可以使用第一个元素“ aa ”的处理结果,但是计算仍然是流的生产者责任。换句话说,消费者决定何时以及是否应该执行计算,但是生产者仍然负责如何执行计算。

您可能会认为这很容易,并且不需要流。当然,您是对的,让我们看一下:

public class Producer1Classic {
  public List<String> data() {
    return List.of("a", "b", "c", "d", "e", "f");
  }
  
  public String expensiveStringDoubler(String input) {
    return input + input;
  }
}

消费者:

var producer = new Producer1Classic();
for (String element : producer.data()) {
  System.out.println(producer.expensiveComputation(element));
}

同样的效果,但是实际上我们已经重新发明了轮子,我们的实现模仿了stream的祖先- Iterator并且我们失去了stream的API的优势。

避免过早计算

再次假设我们要对每个流元素执行耗时的操作。在某些情况下,API的使用者无法提前说出需要多少数据。例如:

  • 用户取消了数据加载
  • 在数据处理过程中发生错误,无需处理其余数据
  • 消费者读取数据直到满足条件,例如第一个正值

由于流的惰性,在这种情况下可以避免一些计算。

private Stream<Double> buildStream() {
  return new Random().doubles().boxed();
}
private Double expensiveComputation(Double input) {
  return input / 2;
}
public Stream<Double> stream() {
  return buildStream().map(this::expensiveComputation);
}

消费者:

stream().peek(System.out::println).filter(value -> value > 0.4).findFirst();

在该示例中,使用者读取数据,直到该值大于0.4。生产者并不了解消费者的这种逻辑,但它只计算必要的项目。逻辑(例如条件)可以在用户端独立更改。

API易于使用

使用流而不是自定义API设计还有另一个原因。流是标准库的一部分,并为许多开发人员所熟知。在我们的API中使用流使其他开发人员更容易使用该API。

其他注意事项

错误处理

传统的错误处理不适用于Streams。由于实际处理将推迟到需要时进行,因此构造流时不会引发异常。基本上,我们有两个选择:

  • 引发RuntimeException-终止方法(例如forEach)将引发异常
  • 将元素包装到一个对象中,该对象表示正在处理的元素的当前状态,例如TryVavr库中的特殊类(博客中的详细信息)

资源管理

有时我们必须使用一种资源来提供流数据(例如,外部服务中的会话),并且我们想在流处理完成时将其释放。幸运的是,流实现了Autoclosable接口,我们可以在try-with-resources语句中使用流,从​​而使资源管理变得非常容易。我们要做的就是使用onClose方法在流中注册一个钩子。当流关闭时,该挂钩将自动被调用。

private Stream<Double> buildStream() {
  return new Random().doubles().boxed();
}
private Double expensiveComputation(Double input) {
  if (input > 0.8) throw new RuntimeException("Data processing exception");
  return input / 2;
}
public Stream<Double> stream() {
  return buildStream().map(this::expensiveComputation).onClose(()-> System.out.println("Releasing resources…"));
}

消费者:

try (Stream<Double> stream = stream()){
  stream.forEach(System.out::println);
}

输出:

0.2264004802916616
0.32777949557515484
Releasing resources…
Exception in thread “main” java.lang.RuntimeException: Data processing exception

在该示例中,当发生数据处理异常时,流将通过try-with-resources语句自动关闭,并调用已注册的处理程序。在示例输出中,我们可以看到Releasing resources…处理程序打印的消息。

总结

  1. 流不是集合。
  2. 流可以帮助我们解决以下问题:*处理大量数据*快速访问(不完整的)数据*避免过早计算
  3. 构建流并不难。
  4. 我们必须注意错误处理。
  5. 支持资源管理。

版权声明
本文为[解道jdon]所创,转载请带上原文链接,感谢
https://www.jdon.com/53763

  1. Compare node.js with spring boot- Ryan Gleason
  2. Obvious pitfalls of spring Webflux- Ł ukaszKy ć
  3. Spring founder uncle rod's real thoughts on yaml
  4. 码农飞升记-02-OracleJDK是什么?OracleJDK的版本怎么选择?
  5. What is manong feisheng-02-oracle JDK? How to choose the version of Oracle JDK?
  6. Spring tide surging Xinanjiang
  7. Linux内核软中断
  8. Linux kernel soft interrupt
  9. Linux内核软中断
  10. Linux kernel soft interrupt
  11. Java multithreading Foundation
  12. The construction of Maven private library nexus
  13. I / O stream in Java
  14. JDK 16:Java 16的新功能 - InfoWorld
  15. 在Java中本地进行线程间数据传输的三种方式和源码展示
  16. jdon导致cpu 99%最后tomcat死掉---banq给予回复
  17. 用领域事件模拟AOP注入
  18. JDK 16: new function of Java 16 - InfoWorld
  19. Cartoon: from JVM lock to redis distributed lock
  20. Spring 3.1 终于加入了Cache支持
  21. Prototype与JQuery对比
  22. Three ways of data transmission between threads in Java and source code display
  23. Jdon causes 99% of CPU and Tomcat dies -- banq replies
  24. docker 原理之 user namespace(下)
  25. Simulating AOP injection with domain events
  26. Spring 3.1 finally adds cache support
  27. Comparison between prototype and jquery
  28. User namespace of docker principle (2)
  29. The way to learn java IO stream and XML
  30. Why does a seemingly correct code cause the Dubbo thread pool to be full
  31. 0 基础 Java 自学之路(2021年最新版)
  32. 0 basic Java self study road (latest version in 2021)
  33. c#—基础拾遗(1) 面向对象
  34. C - basic information (1) object oriented
  35. 技术分享|SQL和 NoSQL数据库之间的差异:MySQL(VS)MongoDB
  36. Technology sharing differences between SQL and NoSQL databases: MySQL (VS) mongodb
  37. PHP教程/面向对象-3~构造函数和析构函数
  38. Spring Cloud的Feign客户端入门
  39. 优化Spring Boot应用的Docker打包速度
  40. PHP tutorial / object oriented - 3 ~ constructor and destructor
  41. Introduction to feign client of spring cloud
  42. Optimizing docker packaging speed of spring boot application
  43. 尚硅谷宋红康Java基础教程2019版
  44. 尚硅谷宋红康Java基础教程2019版
  45. Song Hongkang Java foundation course 2019
  46. Song Hongkang Java foundation course 2019
  47. Redis 6 的多线程
  48. Multithreading of redis 6
  49. SpringCloud-微服务架构编码构建
  50. SpringCloud-微服务架构编码构建
  51. Linux作业控制
  52. Coding construction of springcloud microservice architecture
  53. Java中几个常用并发队列比较 | Baeldung
  54. 为什么Java后端在创业企业中并不流行? -reddit
  55. Coding construction of springcloud microservice architecture
  56. Linux job control
  57. Comparison of several common concurrent queues in Java
  58. Why is java backend not popular in start-ups- reddit
  59. docker 资源限制之 cgroup
  60. 大数据环境: hadoop和jdk部署