Flink处理函数实战之二:ProcessFunction类,java线程面试题目

HarmonyOS学习 2021-11-25 17:49:45
java 面试 编程语言 后端开发

Flink处理函数实战之二:ProcessFunction类,java线程面试题目_Java

 创建工程

执行以下命令创建一个flink-1.9.2的应用工程:

mvn \

archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

1.9.2

按提示输入groupId:com.bolingcavalry,architectid:flinkdemo

 第一个demo

第一个demo用来体验以下两个特性:

  1. 处理单个元素;

  2. 访问时间戳;

创建Simple.java,内容如下:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.util.Collector;

public class Simple {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 并行度为1

env.setParallelism(1);

// 设置数据源,一共三个元素

DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {

@Override

public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {

for(int i=1; i<4; i++) {

String name = “name” + i;

Integer value = i;

long timeStamp = System.currentTimeMillis();

// 将将数据和时间戳打印出来,用来验证数据

System.out.println(String.format(“source,%s, %d, %d\n”,

name,

value,

timeStamp));

// 发射一个元素,并且戴上了时间戳

ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);

// 为了让每个元素的时间戳不一样,每发射一次就延时10毫秒

Thread.sleep(10);

}

}

@Override

public void cancel() {

}

});

// 过滤值为奇数的元素

SingleOutputStreamOperator<String> mainDataStream = dataStream

.process(new ProcessFunction<Tuple2<String, Integer>, String>() {

@Override

public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

// f1字段为奇数的元素不会进入下一个算子

if(0 == value.f1 % 2) {

out.collect(String.format(“processElement,%s, %d, %d\n”,

value.f0,

value.f1,

ctx.timestamp()));

}

}

});

// 打印结果,证明每个元素的timestamp确实可以在ProcessFunction中取得

mainDataStream.print();

env.execute(“processfunction demo : simple”);

}

}

这里对上述代码做个介绍:

  1. 创建一个数据源,每个10毫秒发出一个元素,一共三个,类型是Tuple2,f0是个字符串,f1是整形,每个元素都带时间戳;

  2. 数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致;

  3. 在后面的处理中,创建了ProcessFunction的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉;

  4. 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期;

直接执行Simple类,结果如下,可见过滤和提取时间戳都成功了:

Flink处理函数实战之二:ProcessFunction类,java线程面试题目_Java_02

 第二个demo

第二个demo是实现旁路输出(Side Outputs),对于一个DataStream来说,可以通过旁路输出将数据输出到其他算子中去,而不影响原有的算子的处理,下面来演示旁路输出:

创建SideOutput类:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.util.Collector;

import org.apache.flink.util.OutputTag;

import java.util.ArrayList;

import java.util.List;

public class SideOutput {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 并行度为1

env.setParallelism(1);

// 定义OutputTag

final OutputTag<String> outputTag = new OutputTag<String>(“side-output”){};

// 创建一个List,里面有两个Tuple2元素

List<Tuple2<String, Integer>> list = new ArrayList<>();

list.add(new Tuple2(“aaa”, 1));

list.add(new Tuple2(“bbb”, 2));

list.add(new Tuple2(“ccc”, 3));

//通过List创建DataStream

DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

//所有元素都进入mainDataStream,f1字段为奇数的元素进入SideOutput

SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream

.process(new ProcessFunction<Tuple2<String, Integer>, String>() {

@Override

public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

//进入主流程的下一个算子

out.collect("main, name : " + value.f0 + ", value : " + value.f1);

//f1字段为奇数的元素进入SideOutput

if(1 == value.f1 % 2) {

ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1);

}

}

});

// 禁止chanin,这样可以在页面上看清楚原始的DAG

mainDataStream.disableChaining();

// 取得旁路数据

最后

如果觉得本文对你有帮助的话,不妨给我点个赞,关注一下吧!

Flink处理函数实战之二:ProcessFunction类,java线程面试题目_Java_03

Flink处理函数实战之二:ProcessFunction类,java线程面试题目_面试_04

本文已被 CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

版权声明
本文为[HarmonyOS学习]所创,转载请带上原文链接,感谢
https://blog.51cto.com/u_15438507/4690206

  1. La distribution Linux d'Amazon est maintenant entièrement basée sur Fedora
  2. org.springframework.web.bind.MissingServletRequestParameterException
  3. Built in constraints and functions of MySQL Foundation (2)
  4. Basic operation of MySQL Foundation (I)
  5. Introduction to Java zero foundation 3: Java data types
  6. 从零开始搭建EasyDarwin环境——Linux系统开发环境搭建Golang
  7. Redis source Concise Analysis 02 - SDS String
  8. Construire l'environnement easydarwin à partir de zéro - - construire l'environnement de développement du système Linux golang
  9. javaweb代码是正确的,但是第一行代码就报错了
  10. **** | Java | 后端开挂:3行代码写出8个接口
  11. Java || 看了大二学长写的代码,我竟开始默默的模仿了。。。
  12. Java | 手把手教你实现一个抽奖系统(Java版)
  13. Java | Manuel pour vous apprendre à mettre en œuvre un système de loterie (version Java)
  14. Java | | après avoir lu le Code que j'ai écrit en deuxième année, j'ai commencé à imiter silencieusement...
  15. Java | back - end Pending: 3 - line Code write 8 Interfaces
  16. Le Code Web Java est correct, mais la première ligne de code est incorrecte
  17. Android网络编程之Http通信
  18. Android網絡編程之Http通信
  19. Http communication for Android Network Programming
  20. 数据结构实验八 领会图的两种主要储存结构和图的基本运算算法设计
  21. Hibernate数据校验简介
  22. The story of spring
  23. Il a dépensé 270 000 yuans pour soulever Xiaopeng p7 et a parcouru 3 627 km. Le propriétaire du véhicule a partagé 6 avantages et inconvénients.
  24. 阿里蚂蚁花呗团队面试题:spring+分布式+jvm+session+redis
  25. 【Java入门100例】14.字符串排序——compareTo()
  26. 【Java入门100例】13.修改文件扩展名——字符串替换
  27. Leetcode 79. Word Search [C + + / java detailed problem]
  28. Introduction à la vérification des données hibernantes
  29. Expérience de la structure des données
  30. Spring cloud gateway practice 2: more routing configuration methods
  31. Java network programming - summary overview
  32. 基于语法树的 Java 代码自动化插桩
  33. 云原生 Spring Boot 应用配置 Prometheus + Grafana 监控(保姆级)
  34. Spring cloud gateway practice 2: more routing configuration methods
  35. Jenkins file one line of code to deploy. Net program to k8s
  36. Java network programming - summary overview
  37. Cloud Native Spring Boot application configuration Prometheus + grafana Monitoring (baby - sitter)
  38. Insertion automatique de code Java basée sur l'Arbre syntaxique
  39. Le SUV phare de Xiaopeng, Xiaopeng G9, a fait ses débuts au salon de l'automobile et s'est tenu en position C dans la nouvelle force?
  40. Docker 从入门到实践系列四 - Docker 容器编排利器 Docker Compose
  41. 6年老猿带你掌握Spring Boot实现定时任务的动态增删启停
  42. disruptor笔记之六:常见场景,java教程从入门到精通pdf百度云
  43. Pourquoi InnoDB n'utilise - t - il pas un cache LRU naïf?
  44. Java Reflection (2): quelques opérations de base de reflection
  45. 6年老猿帶你掌握Spring Boot實現定時任務的動態增删啟停
  46. Les singes âgés vous permettent de maîtriser le démarrage et l'arrêt dynamiques des tâches programmées par Spring boot
  47. Docker From Beginning to Practice Series IV - docker Container chorégraphe Clean docker Composition
  48. 编写 java 程序,为家用电脑 ipv6 自动更新 goddy dns 记录(ddns)
  49. java jvm-old gc耗时几十s,导致系统告警
  50. Disruptor note 6: scénario commun, tutoriel Java de l'introduction à la maîtrise du PDF Baidu Cloud
  51. 编写Java程序启动脚本最佳实践
  52. How to get the correct Linux user's documents, music videos and other directories?
  53. Java JVM Old GC prend des dizaines de s, ce qui provoque une alarme système
  54. Écrivez un programme Java pour mettre à jour automatiquement les enregistrements DNS goddy (ddns) pour l'ordinateur domestique IPv6
  55. 編寫Java程序啟動脚本最佳實踐
  56. Meilleures pratiques pour écrire des scripts de démarrage de programmes Java
  57. Notes sur springcloud Eureka
  58. Ajout, suppression et modification simples de mybatis
  59. Java - carte mémoire de l'objet
  60. Why did docker lose to kubernetes? Docker employee readme!