Flink Handling Function Real War II: processfunction class, Java thread interview subject

Harmonios Learning 2021-11-25 17:57:18
flink handling function real war

FlinkFonction de traitement combat réel 2:ProcessFunctionCatégorie,javaQuestions d'entrevue par fil_Java

 Créer un projet

Exécutez la commande suivante pour créer unflink-1.9.2Ingénierie des applications:

mvn \

archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=

《Grandes usines de première ligneJavaAnalyse des questions d'entrevue+Notes d'apprentissage pour le développement de l'arrière - plan+La dernière vidéo d'architecture+Document d'information sur le code source du projet en direct》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 Partage open source du contenu complet

1.9.2

Saisissez à l'invitegroupId:com.bolingcavalry,architectid:flinkdemo

 Le premierdemo

Le premierdemoPour expérimenter les deux caractéristiques suivantes:

  1. Travailler avec des éléments individuels;

  2. TIMESTAMP d'accès;

CréationSimple.java,Il se lit comme suit::

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.util.Collector;

public class Simple {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Parallélisme1

env.setParallelism(1);

// Définir la source de données, Trois éléments

DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {

@Override

public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {

for(int i=1; i<4; i++) {

String name = “name” + i;

Integer value = i;

long timeStamp = System.currentTimeMillis();

// Les données et l'horodatage seront imprimés , Utilisé pour valider les données

System.out.println(String.format(“source,%s, %d, %d\n”,

name,

value,

timeStamp));

// Lancer un élément , Et l'horodatage

ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);

// Pour que chaque élément ait un horodatage différent , Chaque lancement est retardé 10MS

Thread.sleep(10);

}

}

@Override

public void cancel() {

}

});

// Filtrer les éléments avec des valeurs impaires

SingleOutputStreamOperator<String> mainDataStream = dataStream

.process(new ProcessFunction<Tuple2<String, Integer>, String>() {

@Override

public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

// f1 Un élément avec un champ impair n'entre pas dans l'opérateur suivant

if(0 == value.f1 % 2) {

out.collect(String.format(“processElement,%s, %d, %d\n”,

value.f0,

value.f1,

ctx.timestamp()));

}

}

});

// Imprimer les résultats, Preuve de chaque élément timestamp En effet. ProcessFunctionAcquis

mainDataStream.print();

env.execute(“processfunction demo : simple”);

}

}

Voici une introduction au code ci - dessus :

  1. Créer une source de données,Chaque10 Milliseconde pour envoyer un élément ,Trois.,Le type estTuple2,f0C'est une chaîne,f1C'est un lifting., Chaque élément est horodaté ;

  2. Lorsque la source de données émet un élément , Mettre l'élément à l'avance f0、f1、 L'horodatage est imprimé , Vérifier la cohérence avec les données suivantes ;

  3. Dans le traitement ultérieur ,CrééProcessFunction Sous - classe anonyme de , Il peut traiter chaque élément envoyé en amont , Et vous pouvez obtenir l'horodatage de chaque élément ( Cette capacité est importante ),Et ensuite,f1 Le champ est un nombre impair d'éléments filtrés ;

  4. Enfin,ProcessFunction Les données traitées sont imprimées , Vérifier que les résultats du traitement sont conformes aux attentes ;

Exécution directeSimpleCatégorie,Les résultats sont les suivants:, Le filtrage visible et l'extraction de l'horodatage ont tous deux réussi :

FlinkFonction de traitement combat réel 2:ProcessFunctionCatégorie,javaQuestions d'entrevue par fil_Java_02

 Deuxièmedemo

Deuxièmedemo Est d'obtenir une sortie de dérivation (Side Outputs),Pour unDataStreamDis, Les données peuvent être exportées par contournement vers d'autres opérateurs , Sans affecter le traitement de l'opérateur original , Voici une démonstration de la sortie de dérivation :

CréationSideOutputCatégorie:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.util.Collector;

import org.apache.flink.util.OutputTag;

import java.util.ArrayList;

import java.util.List;

public class SideOutput {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Parallélisme1

env.setParallelism(1);

// DéfinitionOutputTag

final OutputTag<String> outputTag = new OutputTag<String>(“side-output”){};

// Créer unList,Il y en a deux.Tuple2Élément

List<Tuple2<String, Integer>> list = new ArrayList<>();

list.add(new Tuple2(“aaa”, 1));

list.add(new Tuple2(“bbb”, 2));

list.add(new Tuple2(“ccc”, 3));

//AdoptionListCréationDataStream

DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

// Tous les éléments entrent dans mainDataStream,f1 Les éléments dont le champ est impair entrent dans SideOutput

SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream

.process(new ProcessFunction<Tuple2<String, Integer>, String>() {

@Override

public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

// L'opérateur suivant dans le courant principal

out.collect("main, name : " + value.f0 + ", value : " + value.f1);

//f1 Les éléments dont le champ est impair entrent dans SideOutput

if(1 == value.f1 % 2) {

ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1);

}

}

});

// Interdictionchanin, De cette façon, vous pouvez voir clairement l'original sur la page DAG

mainDataStream.disableChaining();

// Obtenir des données de contournement

Enfin

Si vous trouvez cet article utile,Autant me faire un compliment,Fais attention!

FlinkFonction de traitement combat réel 2:ProcessFunctionCatégorie,javaQuestions d'entrevue par fil_Java_03

FlinkFonction de traitement combat réel 2:ProcessFunctionCatégorie,javaQuestions d'entrevue par fil_Interview_04

Cet article a été publié par CODINGProjet Open Source:【Grandes usines de première ligneJavaAnalyse des questions d'entrevue+Résumé de base notes d'étude+Dernière vidéo d'explication+Code source du projet opérationnel】Inclus

版权声明
本文为[Harmonios Learning]所创,转载请带上原文链接,感谢
https://javamana.com/2021/11/20211125174940970h.html

  1. 6年老猿带你掌握Spring Boot实现定时任务的动态增删启停
  2. disruptor笔记之六:常见场景,java教程从入门到精通pdf百度云
  3. Pourquoi InnoDB n'utilise - t - il pas un cache LRU naïf?
  4. Java Reflection (2): quelques opérations de base de reflection
  5. 6年老猿帶你掌握Spring Boot實現定時任務的動態增删啟停
  6. Les singes âgés vous permettent de maîtriser le démarrage et l'arrêt dynamiques des tâches programmées par Spring boot
  7. Docker From Beginning to Practice Series IV - docker Container chorégraphe Clean docker Composition
  8. 编写 java 程序,为家用电脑 ipv6 自动更新 goddy dns 记录(ddns)
  9. java jvm-old gc耗时几十s,导致系统告警
  10. Disruptor note 6: scénario commun, tutoriel Java de l'introduction à la maîtrise du PDF Baidu Cloud
  11. 编写Java程序启动脚本最佳实践
  12. How to get the correct Linux user's documents, music videos and other directories?
  13. Java JVM Old GC prend des dizaines de s, ce qui provoque une alarme système
  14. Écrivez un programme Java pour mettre à jour automatiquement les enregistrements DNS goddy (ddns) pour l'ordinateur domestique IPv6
  15. 編寫Java程序啟動脚本最佳實踐
  16. Meilleures pratiques pour écrire des scripts de démarrage de programmes Java
  17. Notes sur springcloud Eureka
  18. Ajout, suppression et modification simples de mybatis
  19. MySQL Learning - Logging System Redo log and Bin log
  20. Springboot Common comments | @ configuration
  21. Mécanisme d'expiration du cache redis et d'élimination de la mémoire
  22. Analyse concise du code source redis 01 - configuration de l'environnement
  23. Java - carte mémoire de l'objet
  24. Redis source Concise Analysis 02 - SDS String
  25. Why did docker lose to kubernetes? Docker employee readme!
  26. Spring cloud gateway practice 2: more routing configuration methods
  27. Principe de mise en œuvre ultime du mécanisme de concurrence Java sous - jacent
  28. [démarrer avec Java 100 exemples] 13. Modifier l’extension de fichier - remplacement de chaîne
  29. Java期末作业——王者荣耀的洛克王国版游戏
  30. Elasticsearch聚合学习之五:排序结果不准的问题分析,阿里巴巴java性能调优实战
  31. Java期末作業——王者榮耀的洛克王國版遊戲
  32. Java final work - King's Glory Rock Kingdom Game
  33. 【网络编程】TCP 网络应用程序开发
  34. 【网络编程入门】什么是 IP、端口、TCP、Socket?
  35. 【網絡編程入門】什麼是 IP、端口、TCP、Socket?
  36. [Introduction à la programmation réseau] qu'est - ce que IP, port, TCP et socket?
  37. [programmation réseau] développement d'applications réseau TCP
  38. [Java Basics] comprendre les génériques
  39. Dix outils open source que les architectes de logiciels Java devraient maîtriser!!
  40. Java经典面试题详解,突围金九银十面试季(附详细答案,mysql集群架构部署方案
  41. java架构之路(多线程)synchronized详解以及锁的膨胀升级过程,mysql数据库实用教程pdf
  42. java整理,java高级特性编程及实战第一章
  43. java教程——反射,mongodb下载教程
  44. Java岗大厂面试百日冲刺 - 日积月累,每日三题【Day12,zookeeper原理作用
  45. Java后端互联网500道中高级面试题(含答案),linux钩子技术
  46. java8 Stream API及常用方法,java初级程序员面试
  47. java-集合-Map(双列)——迪迦重制版,2021Java开发社招面试解答之性能优化
  48. Flink处理函数实战之二:ProcessFunction类,java线程面试题目
  49. flex 布局详解,【Java面试题
  50. Linux basic command learning
  51. Why did docker lose to kubernetes? Docker employee readme!
  52. MySQL安装
  53. Elastic Search Aggregate Learning five: Problem Analysis of Uncertainty of sequencing results, Alibaba Java Performance Tuning Practical
  54. Installing, configuring, starting and accessing rabbitmq under Linux
  55. Oracle SQL injection summary
  56. Installation MySQL
  57. L'exposition à la photo d'essai sur la route i7 du nouveau vaisseau amiral de BMW Pure Electric a également été comparée à celle de Xiaopeng p7.
  58. spring JTA 关于异常处理的时机问题
  59. Le problème du temps de traitement des exceptions dans la JTA printanière
  60. Do you really know MySQL order by