Welcome to visit mine GitHub
https://github.com/zq2599/blog_demos
Content : All original articles classified summary and supporting source code , involve Java、Docker、Kubernetes、DevOPS etc. ;
An overview of this article
This article is about 《Flink Of sink actual combat 》 The second part of the series , above 《Flink Of sink One of the real battles : On 》 Yes sink With a basic understanding of , In this chapter, we will experience the data sink To kafka The operation of ;
Full series Links
- 《Flink Of sink One of the real battles : On 》
- 《Flink Of sink The second part of the actual battle :kafka》
- 《Flink Of sink The third part of the actual battle :cassandra3》
- 《Flink Of sink The fourth part of the actual battle : Customize 》
Version and environment preparation
The actual combat environment and version are as follows :
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- operating system :macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
- Kafka:2.4.0
- Zookeeper:3.5.5
<font color="red"> Please make sure that the above environment and services are in place ;</font>
Source download
If you don't want to write code , The source code of the whole series can be found in GitHub Download to , The address and link information is shown in the following table (https://github.com/zq2599/blog_demos):
name | link | remarks |
---|---|---|
Project home page | https://github.com/zq2599/blog_demos | The project is in progress. GitHub Home page on |
git Warehouse address (https) | https://github.com/zq2599/blog_demos.git | The warehouse address of the source code of the project ,https agreement |
git Warehouse address (ssh) | [email protected]:zq2599/blog_demos.git | The warehouse address of the source code of the project ,ssh agreement |
This git Multiple folders in project , The application of this chapter in <font color="blue">flinksinkdemo</font> Under the folder , As shown in the red box below : Ready , Start developing ;
preparation
Before official coding , First go to the official website to check the relevant information for basic information :
- Address :https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
- I used it here kafka yes 2.4.0 edition , Find the corresponding libraries and classes in the official documents , As shown in the red box below :
kafka Get ready
- Create a test006 Of topic, There are four divisions , Refer to the order :
./kafka-topics.sh \
--create \
--bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 \
--partitions 4 \
--topic test006
- Consume at the console test006 The news of , Refer to the order :
./kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic test006
- At this time, if the topic There's news coming in , It will output... On the console ;
- Next, start coding ;
Create a project
- use maven Command to create flink engineering :
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
- According to the prompt ,groupid Input <font color="blue">com.bolingcavalry</font>,artifactid Input <font color="blue">flinksinkdemo</font>, You can create a maven engineering ;
- stay pom.xml add kafka Dependency Library :
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>
- Project creation complete , Start writing flink The code for the task ;
Sending string messages sink
Try sending string type messages first :
- establish KafkaSerializationSchema Implementation class of interface , The latter class is to be created as sink Object's parameters use :
package com.bolingcavalry.addsink;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String> {
private String topic;
public ProducerStringSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
}
}
- Create a task class KafkaStrSink, Please note that FlinkKafkaProducer Object's parameters ,FlinkKafkaProducer.Semantic.EXACTLY_ONCE Strictly once :
package com.bolingcavalry.addsink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaStrSink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The degree of parallelism is 1
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
String topic = "test006";
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(topic,
new ProducerStringSerializationSchema(topic),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
// Create a List, There are two in it Tuple2 Elements
List<String> list = new ArrayList<>();
list.add("aaa");
list.add("bbb");
list.add("ccc");
list.add("ddd");
list.add("eee");
list.add("fff");
list.add("aaa");
// Count the number of words
env.fromCollection(list)
.addSink(producer)
.setParallelism(4);
env.execute("sink demo : kafka str");
}
}
- Use mvn Command compile build , stay target Directory to get files <font color="blue">flinksinkdemo-1.0-SNAPSHOT.jar</font>;
- stay flink Of web Page submission flinksinkdemo-1.0-SNAPSHOT.jar, And make executive class , Here's the picture :
- After successful submission , If flink There are four available slot, The mission will be executed immediately , Will be spending kafak The message terminal receives the message , Here's the picture :
- The execution of the task is shown in the figure below :
Sending the object message sink
Then try how to send the message of object type , The object selection here is commonly used Tuple2 object :
- establish KafkaSerializationSchema Implementation class of interface , This class is followed by sink Object input parameters , Notice the comment in the code that caught the exception :<font color="red"> Careful use of production environment printStackTrace()!!!</font>
package com.bolingcavalry.addsink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
public class ObjSerializationSchema implements KafkaSerializationSchema<Tuple2<String, Integer>> {
private String topic;
private ObjectMapper mapper;
public ObjSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> stringIntegerTuple2, @Nullable Long timestamp) {
byte[] b = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
b= mapper.writeValueAsBytes(stringIntegerTuple2);
} catch (JsonProcessingException e) {
// Be careful , This is a very dangerous operation in the production environment ,
// Too much wrong printing will seriously affect system performance , Please adjust according to the production environment
e.printStackTrace();
}
return new ProducerRecord<byte[], byte[]>(topic, b);
}
}
- establish flink The task class :
package com.bolingcavalry.addsink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaObjSink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The degree of parallelism is 1
env.setParallelism(1);
Properties properties = new Properties();
//kafka Of broker Address
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
String topic = "test006";
FlinkKafkaProducer<Tuple2<String, Integer>> producer = new FlinkKafkaProducer<>(topic,
new ObjSerializationSchema(topic),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
// Create a List, There are two in it Tuple2 Elements
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2("aaa", 1));
list.add(new Tuple2("bbb", 1));
list.add(new Tuple2("ccc", 1));
list.add(new Tuple2("ddd", 1));
list.add(new Tuple2("eee", 1));
list.add(new Tuple2("fff", 1));
list.add(new Tuple2("aaa", 1));
// Count the number of words
env.fromCollection(list)
.keyBy(0)
.sum(1)
.addSink(producer)
.setParallelism(4);
env.execute("sink demo : kafka obj");
}
}
- Compile the build like the previous task , hold jar Submitted to the flink, And specify that the execution class is <font color="blue">com.bolingcavalry.addsink.KafkaObjSink</font>;
- consumption kafka The console output of the message is as follows :
- stay web The execution of the page is as follows :
thus ,flink Take the calculation result as kafka The actual battle of sending the message is finished , I hope to provide you with reference , The next chapter , We will continue to experience the official sink Ability ;
Welcome to the official account : Xinchen, programmer
WeChat search 「 Xinchen, programmer 」, I'm Xinchen , Looking forward to traveling with you Java The world ... https://github.com/zq2599/blog_demos