java-stream

Stream 作为 Java 8 的一大亮点,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。它也不同于 StAX 对 XML 解析的 Stream,也不是 Amazon Kinesis 对大数据实时处理的 Stream。Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。Stream API 借助于同样新出现的 Lambda 表达式,极大的提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。通常编写并行代码很难而且容易出错, 但使用 Stream API 无需编写一行多线程的代码,就可以很方便地写出高性能的并发程序。所以说,Java 8 中首次出现的 java.util.stream 是一个函数式语言+多核时代综合影响的产物。

什么是流
Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:

1.0-1.4 中的 java.lang.Thread
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda
Stream 的另外一大特点是,数据源本身可以是无限的。

流的构成
当我们使用一个流的时候,通常包括三个基本步骤:

获取一个数据源(source)→ 数据转换→执行操作获取想要的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道。

有多种方式生成 Stream Source:

从 Collection 和数组
Collection.stream()
Collection.parallelStream()
Arrays.stream(T array) or Stream.of()
从 BufferedReader
java.io.BufferedReader.lines()
静态工厂
java.util.stream.IntStream.range()
java.nio.file.Files.walk()
自己构建
java.util.Spliterator
其它
Random.ints()
BitSet.stream()
Pattern.splitAsStream(java.lang.CharSequence)
JarFile.stream()

流的操作类型分为两种:

Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
在对于一个 Stream 进行多次转换操作 (Intermediate 操作),每次都对 Stream 的每个元素进行转换,而且是执行多次,这样时间复杂度就是 N(转换次数)个 for 循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream 里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在 Terminal 操作的时候循环 Stream 对应的集合,然后对每个元素执行所有的函数。

还有一种操作被称为 short-circuiting。用以指:

对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个有限的新 Stream。
对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。
当操作一个无限大的 Stream,而又希望在有限时间内完成操作,则在管道内拥有一个 short-circuiting 操作是必要非充分条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531

import lombok.Data;
import org.junit.Test;

import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
* Stream 方法
*
* <pre>
*
* 巧用Java8中的Stream,让集合操作飞起来! https://mp.weixin.qq.com/s/fx8XvRjcevXJMRzrWeHx-A
* java8的Stream对集合操作飞起来 https://juejin.im/post/5d5e2616f265da03b638b28a
* Java 8 Stream https://www.runoob.com/java/java8-streams.html
* Java 8 中的 Streams API 详解 https://www.ibm.com/developerworks/cn/java/j-lo-java8streamapi/index.html
*
* Stream作为java8的新特性,基于lambda表达式,是对集合对象功能的增强,它专注于对集合对象进行各种高效、便利的聚合操作或者大批量的数据操作,提高了编程效率和代码可读性。
* Stream的原理:将要处理的元素看做一种流,流在管道中传输,并且可以在管道的节点上处理,包括过滤筛选、去重、排序、聚合等。元素流在管道中经过中间操作的处理,最后由最终操作得到前面处理的结果。
*
* 集合有两种方式生成流:
* stream() − 为集合创建串行流
* parallelStream() - 为集合创建并行流
*
* 中间操作主要有以下方法(此类型方法返回的都是Stream):map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered
* 终止操作主要有以下方法:forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
*
* </pre>
*
* @author: weikeqin@gmail.com
**/
public class StreamSample {

@Test
public void getStream() {

// 1. Individual values
Stream stream = Stream.of("a", "b", "c");

// 2. Arrays
String[] strArray = new String[]{"a", "b", "c"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);

// 3. Collections
List<String> list = Arrays.asList(strArray);
stream = list.stream();

stream.forEach(System.out::println);


// 数值流的构造
IntStream.of(new int[]{1, 2, 3}).forEach(System.out::println);
IntStream.range(1, 3).forEach(System.out::println);
IntStream.rangeClosed(1, 3).forEach(System.out::println);

}


/**
*
*/
@Test
public void streamExample() {
List<Student> students = getList();
List<Long> res = students.parallelStream()
.filter(x -> "北京".equals(x.getAddress()))
.sorted(Comparator.comparing(Student::getAge).reversed())
.map(Student::getId)
.collect(Collectors.toList());
res.forEach(System.out::println);
}

/**
* 把所有的单词转换为大写
*/
@Test
public void strToUpperCase() {
List<String> strs = getStringList();
strs.stream().
map(String::toUpperCase).
collect(Collectors.toList()).
forEach(System.out::println);
}


/**
* 平方数
*/
@Test
public void cal() {
List<Integer> nums = Arrays.asList(1, 2, 3, 4);
nums.stream().
map(n -> n * n).
collect(Collectors.toList()).
forEach(System.out::println);
}


/**
* 一对多
*/
@Test
public void listTest() {
Stream<List<Integer>> inputStream = Stream.of(
Arrays.asList(1),
Arrays.asList(2, 3),
Arrays.asList(4, 5, 6)
);
inputStream.
flatMap((childList) -> childList.stream()).
forEach(System.out::println);
}

/**
* 筛选
*/
@Test
public void filterTest() {

List<Student> students = getList();

// 筛选地址是浙江的
List<Student> streamStudents = students.stream().filter(s -> "浙江".equals(s.getAddress())).collect(Collectors.toList());
streamStudents.forEach(System.out::println);

System.out.println("----------------");
// 筛选年龄大于15的
List<Student> students2 = students.stream().filter(s -> s.getAge() > 15).collect(Collectors.toList());
students2.forEach(x -> {
System.out.println(x);
});

System.out.println("----------------");
List<Student> students3 = students.stream().filter(s -> s.getAge() == 18).collect(Collectors.toList());
students3.forEach(System.out::println);
}


/**
* peek 对每个元素执行操作并返回一个新的 Stream
*/
@Test
public void test1() {
Stream.of("one", "two", "three", "four")
.filter(e -> e.length() > 3)
.peek(e -> System.out.println("Filtered value: " + e))
.map(String::toUpperCase)
.peek(e -> System.out.println("Mapped value: " + e))
.collect(Collectors.toList())
.forEach(System.out::println);
}

/**
* Optional 的两个用例
*/
@Test
public void optionalUse() {

String text = "abcd";
//text = null;

Optional.ofNullable(text).ifPresent(System.out::println);

int i = Optional.ofNullable(text).map(String::length).orElse(-1);
System.out.println("text 长度:" + i);
}


/**
* reduce 的用例
*/
@Test
public void reduceUse() {
// 字符串连接,concat = "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("_", String::concat);
System.out.println(concat);

// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
System.out.println("最小值: " + minValue);

// 求和,sumValue = 10, 有起始值
int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
System.out.println("和: " + sumValue);

// 求和,sumValue = 10, 无起始值
sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
System.out.println("和: " + sumValue);

// 过滤,字符串连接,concat = "ace"
concat = Stream.of("a", "B", "c", "D", "e", "F").
filter(x -> x.compareTo("Z") > 0).
reduce("", String::concat);
System.out.println("过滤后的字符串: " + concat);
}

/**
* 获取数据
*
* @return
*/
public List<Student> getList() {
Student s1 = new Student(1L, "张三", 13, "浙江");
Student s2 = new Student(2L, "李四", 31, "湖北");
Student s3 = new Student(3L, "王五", 17, "北京");
Student s4 = new Student(4L, "赵六", 18, "山西");
Student s5 = new Student(5L, "田七", 18, "北京");

List<Student> students = new ArrayList<>();
students.add(s1);
students.add(s2);
students.add(s3);
students.add(s4);
students.add(s5);

return students;
}

/**
* @return
*/
public List<Student> getList2() {
List<Student> students = getList();
Student s6 = new Student();
Student s7 = null;
students.add(s6);
students.add(s7);
return students;
}

/**
* @return
*/
public List<String> getStringList() {
List<String> list = Arrays.asList("a", "aa", "b", "c", "A", "AA", "B", "C", "1", "11", "2", "中", "中国", "众", "众人", "终", "终于", "#", " ", " a", " A", " 中");
System.out.println("list size:" + list.size());
return list;
}

/**
* list转map
*/
@Test
public void listToMap() {

List<Student> list = getList();

// 方法一
// Map<id, Student>
Map<Long, Student> map = list.stream().collect(Collectors.toMap(Student::getId, Function.identity()));
System.out.println(map);

// 方法二
// Map<id, Student>
Map<Long, Student> map2 = list.stream().collect(Collectors.toMap(s -> s.getId(), v -> v));
System.out.println(map2);

// 方法三
// Duplicated Key (List转Map 重复key问题)
Map<Long, Student> map3 = list.stream().collect(Collectors.toMap(s -> s.getId(), v -> v, (oldValue, newValue) -> oldValue));
System.out.println(map3);

// 方法四
// Map<id, name>
Map<Long, String> idNameMap = list.stream().collect(Collectors.toMap(Student::getId, Student::getName));
System.out.println(idNameMap);

}


/**
* 集合处理
* <p>
* list 转 list
* list 转 set
*/
@Test
public void listDeal() {
List<Student> list = getList();
List<String> nameList = list.stream().map(x -> "姓名:" + x.getName()).collect(Collectors.toList());
nameList.forEach(System.out::println);

Set<String> nameSet = list.stream().map(x -> "姓名:" + x.getName()).collect(Collectors.toSet());
nameSet.forEach(System.out::println);
}


/**
* 字符串排序
*/
@Test
public void strSort() {
List<String> list = getStringList();
list.stream().sorted().forEach(System.out::println);
}

/**
* 对象排序
*/
@Test
public void listSort() {
List<Student> list = getList();
Collections.shuffle(list);
list.forEach(System.out::println);
System.out.println("------------------");

Map<Long, Student> map = list.stream().sorted(Comparator.comparingLong(Student::getId).reversed()).collect(Collectors.toMap(k -> k.getId(), v -> v, (o, n) -> o));
System.out.println(map);
System.out.println("------------------");

Map<Long, Student> map2 = list.stream().sorted(Comparator.comparingLong(Student::getId).reversed()).collect(Collectors.toMap(Student::getId, v -> v, (o, n) -> o));
System.out.println(map2);

}

/**
*
*/
@Test
public void strDistinct() {
List<String> list = Arrays.asList("a", "b", "c", "d", "a", "a", "b");
list.forEach(System.out::println);
System.out.println("---------");
list.stream().distinct().forEach(System.out::println);
}

/**
* 去重
*/
@Test
public void disticnt() {

List<Student> student = getList();
// 集合去重(引用对象)
student.stream().distinct().forEach(System.out::println);
System.out.println("---------");
student.forEach(System.out::println);

System.out.println("------------------");
student.addAll(student);
student.forEach(System.out::println);

System.out.println("------------------");
// 集合去重(引用对象)
student.stream().distinct().forEach(System.out::println);
}

/**
*
*/
@Test
public void limit() {
List<String> list = getStringList();
list.stream().forEach(System.out::println);
System.out.println("------------------");
list.stream().limit(2).forEach(System.out::println);
}

/**
* 集合skip 删除前n个元素
*/
@Test
public void skip() {
List<String> list = getStringList();
list.stream().forEach(System.out::println);
System.out.println("------------------");
list.stream().skip(2).forEach(System.out::println);
}

/**
* 集合reduce,将集合中每个元素聚合成一条数据
*/
@Test
public void listReduce() {
List<String> list = Arrays.asList("欢", "迎", "你");
String str = list.stream().reduce("北京", (a, b) -> a + b);
System.out.println(str);
}


/**
*
*/
@Test
public void min() {
List<Student> list = getList();
Student minAgeStudent = list.stream().filter(x -> (x != null && x.getAge() > 0)).filter(x -> x.getAge() != null).min((x, y) -> Integer.compare(x.getAge(), y.getAge())).get();
System.out.println(minAgeStudent);
}

/**
* <pre>
* Operator != cannot be applied to 'int', 'null'
* 数字类型为int时不能用 != null
* </pre>
*/
@Test
public void testOperator() {
List<Student> list = getList2();
list.stream().filter(x -> (x != null && x.getAge() != null)).forEach(System.out::println);

System.out.println("------------------");
//
list.stream().filter(x -> x.getId() == 1).collect(Collectors.toList());
}


/**
* <pre>
* anyMatch:Stream 中任意一个元素符合传入的 predicate,返回 true
* allMatch:Stream 中全部元素符合传入的 predicate,返回 true
* noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true
* </pre>
*/
@Test
public void match() {
List<Student> list = getList();

Boolean anyMatch = list.stream().anyMatch(x -> "北京".equals(x.getAddress()));
System.out.println("anyMatch res: " + anyMatch);

Boolean allMatch = list.stream().anyMatch(x -> x.getAge() > 3);
System.out.println("allMatch res: " + allMatch);

Boolean noMatch = list.stream().noneMatch(x -> x.getAge() > 35);
System.out.println("noMatch res: " + noMatch);
}


@Test
public void parallelTest() {
List<String> strs = getStringList();
Long count = strs.parallelStream().filter(s -> s.length() == 1).count();
System.out.println("count: " + count);
}

@Test
public void test() {
List<Integer> numbers = Arrays.asList(1, 3, 2, 2, 3, 7, 3, 5, 11, 13, 17);

IntSummaryStatistics stats = numbers.stream().mapToInt((x) -> x).summaryStatistics();

System.out.println("列表中最大的数 : " + stats.getMax());
System.out.println("列表中最小的数 : " + stats.getMin());
System.out.println("所有数之和 : " + stats.getSum());
System.out.println("平均数 : " + stats.getAverage());
}


/**
*
*/
@Test
public void mySupplierTest() {
Stream.generate(new PersonSupplier()).
limit(10).
forEach(p -> System.out.println(p.getName() + ", " + p.getAge()));
}

/**
* 生成等差数列
*/
@Test
public void getIterate() {
Stream.iterate(0, n -> n + 3).limit(10).forEach(x -> System.out.print(x + " "));
}

/**
*
*/
@Test
public void groupByTest() {
Map<Integer, List<Student>> personGroups = Stream.generate(new PersonSupplier()).
limit(100).
collect(Collectors.groupingBy(Student::getAge));
Iterator it = personGroups.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, List<Student>> persons = (Map.Entry) it.next();
System.out.println("Age " + persons.getKey() + " = " + persons.getValue().size());
}
}


private class PersonSupplier implements Supplier<Student> {
private int index = 0;
private Random random = new Random();

@Override
public Student get() {
return new Student(index++, "StormTestUser" + index, random.nextInt(100));
}
}
}


@Data
class Student {

private Long id;
private String name;
private Integer age;
private String address;
private Integer num;

public Student() {
}

/**
* @param id
* @param name
* @param age
* @param address
*/
public Student(Long id, String name, int age, String address) {
this.id = id;
this.name = name;
this.age = age;
this.address = address;
}

public Student(int i, String s, int nextInt) {
this.num = i;
this.name = s;
this.age = nextInt;
}
}

References

[1] Java 8 中的 Streams API 详解
[2] 巧用Java8中的Stream,让集合操作飞起来
[3] java8的Stream对集合操作飞起来
[4] Java 8 Stream