Exécutez la commande suivante pour créer unflink-1.9.2Ingénierie des applications:
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=
《Grandes usines de première ligneJavaAnalyse des questions d'entrevue+Notes d'apprentissage pour le développement de l'arrière - plan+La dernière vidéo d'architecture+Document d'information sur le code source du projet en direct》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 Partage open source du contenu complet
1.9.2
Saisissez à l'invitegroupId:com.bolingcavalry,architectid:flinkdemo
Le premierdemoPour expérimenter les deux caractéristiques suivantes:
Travailler avec des éléments individuels;
TIMESTAMP d'accès;
CréationSimple.java,Il se lit comme suit::
package com.bolingcavalry.processfunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class Simple {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Parallélisme1
env.setParallelism(1);
// Définir la source de données, Trois éléments
DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
for(int i=1; i<4; i++) {
String name = “name” + i;
Integer value = i;
long timeStamp = System.currentTimeMillis();
// Les données et l'horodatage seront imprimés , Utilisé pour valider les données
System.out.println(String.format(“source,%s, %d, %d\n”,
name,
value,
timeStamp));
// Lancer un élément , Et l'horodatage
ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);
// Pour que chaque élément ait un horodatage différent , Chaque lancement est retardé 10MS
Thread.sleep(10);
}
}
@Override
public void cancel() {
}
});
// Filtrer les éléments avec des valeurs impaires
SingleOutputStreamOperator<String> mainDataStream = dataStream
.process(new ProcessFunction<Tuple2<String, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
// f1 Un élément avec un champ impair n'entre pas dans l'opérateur suivant
if(0 == value.f1 % 2) {
out.collect(String.format(“processElement,%s, %d, %d\n”,
value.f0,
value.f1,
ctx.timestamp()));
}
}
});
// Imprimer les résultats, Preuve de chaque élément timestamp En effet. ProcessFunctionAcquis
mainDataStream.print();
env.execute(“processfunction demo : simple”);
}
}
Voici une introduction au code ci - dessus :
Créer une source de données,Chaque10 Milliseconde pour envoyer un élément ,Trois.,Le type estTuple2,f0C'est une chaîne,f1C'est un lifting., Chaque élément est horodaté ;
Lorsque la source de données émet un élément , Mettre l'élément à l'avance f0、f1、 L'horodatage est imprimé , Vérifier la cohérence avec les données suivantes ;
Dans le traitement ultérieur ,CrééProcessFunction Sous - classe anonyme de , Il peut traiter chaque élément envoyé en amont , Et vous pouvez obtenir l'horodatage de chaque élément ( Cette capacité est importante ),Et ensuite,f1 Le champ est un nombre impair d'éléments filtrés ;
Enfin,ProcessFunction Les données traitées sont imprimées , Vérifier que les résultats du traitement sont conformes aux attentes ;
Exécution directeSimpleCatégorie,Les résultats sont les suivants:, Le filtrage visible et l'extraction de l'horodatage ont tous deux réussi :
Deuxièmedemo Est d'obtenir une sortie de dérivation (Side Outputs),Pour unDataStreamDis, Les données peuvent être exportées par contournement vers d'autres opérateurs , Sans affecter le traitement de l'opérateur original , Voici une démonstration de la sortie de dérivation :
CréationSideOutputCatégorie:
package com.bolingcavalry.processfunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;
public class SideOutput {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Parallélisme1
env.setParallelism(1);
// DéfinitionOutputTag
final OutputTag<String> outputTag = new OutputTag<String>(“side-output”){};
// Créer unList,Il y en a deux.Tuple2Élément
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2(“aaa”, 1));
list.add(new Tuple2(“bbb”, 2));
list.add(new Tuple2(“ccc”, 3));
//AdoptionListCréationDataStream
DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);
// Tous les éléments entrent dans mainDataStream,f1 Les éléments dont le champ est impair entrent dans SideOutput
SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream
.process(new ProcessFunction<Tuple2<String, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
// L'opérateur suivant dans le courant principal
out.collect("main, name : " + value.f0 + ", value : " + value.f1);
//f1 Les éléments dont le champ est impair entrent dans SideOutput
if(1 == value.f1 % 2) {
ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1);
}
}
});
// Interdictionchanin, De cette façon, vous pouvez voir clairement l'original sur la page DAG
mainDataStream.disableChaining();
// Obtenir des données de contournement
Si vous trouvez cet article utile,Autant me faire un compliment,Fais attention!
Cet article a été publié par CODINGProjet Open Source:【Grandes usines de première ligneJavaAnalyse des questions d'entrevue+Résumé de base notes d'étude+Dernière vidéo d'explication+Code source du projet opérationnel】Inclus