Flink实战(109):connector(十八)hdfs 读写(三)StreamingFileSink相关特性及代码实战

wx5c7a97e3804fd 2021-07-20 04:14:26
大数据技术 Flink学习


声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。

《2021年最新版大数据面试题全面开启更新》

一、概述

    Flink流式计算的核心概念,就是将数据从Source输入流一个个传递给Operator进行链式处理,最后交给Sink输出流的过程。本篇文章主要讲解Sink端比较强大一个功能类StreamingFileSink,我们基于最新的Flink1.10.0版本进行讲解,之前版本可能使用BucketingSink,但是BucketingSink从Flink 1.9开始已经被废弃,并会在后续的版本中删除,这里只讲解StreamingFileSink相关特性。

二、StreamingFileSink相关特性

    这个连接器提供了一个 Sink 来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。

    Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。

    桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。

重要: 

    使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。

1 part file生命周期

    先来看一下官网的文件输出状态图:

 

 

为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。由上图可知,部分文件(part file)可以处于以下三种状态之一:

1).In-progress :

    当前文件正在写入中

2).Pending :

    当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

3).Finished :

    在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

 

重要:  

    部分文件的索引在每个 subtask 内部是严格递增的(按文件创建顺序)。但是索引并不总是连续的。当 Job 重启后,所有部分文件的索引从 `max part index + 1` 开始, 这里的 `max part index` 是所有 subtask 中索引的最大值。

2.文件编码格式

    StreamingFileSink 支持行编码格式和批量编码格式,比如 Apache Parquet 。这两种变体可以使用以下静态方法创建:

1).Row-encoded sink: 

    StreamingFileSink.forRowFormat(basePath, rowEncoder)

2).Bulk-encoded sink:

    StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)

复制代码

//行编码
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
//批量编码
final StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
.build();

复制代码

 创建行或批量编码的 Sink 时,我们需要指定存储桶的基本路径和数据的编码逻辑,具体实现后面文章讲解。

重要: 

    批量编码模式仅支持 OnCheckpointRollingPolicy 策略, 在每次 checkpoint 的时候切割文件。

3.桶分配

    桶分配逻辑定义了如何将数据结构化为基本输出目录中的子目录,行格式和批量格式都使用 DateTimeBucketAssigner 作为默认的分配器。默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下:yyyy-MM-dd--HH 。日期格式(即桶的大小)和时区都可以手动配置。

    我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner 。

    Flink 有两个内置的 BucketAssigners :

    1).DateTimeBucketAssigner :默认基于时间的分配器

    2).BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)

4.滚动策略   

    滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

    Flink 有两个内置的滚动策略:

    1).DefaultRollingPolicy

    2).OnCheckpointRollingPolicy

5.part file相关配置项

已经完成的文件和进行中的文件仅能通过文件名格式进行区分。

默认情况下,文件命名格式如下所示:

1).In-progress/Pending: 

    part-<subtaskIndex>-<partFileIndex>.inprogress.uid

2).FINISHED: 

    part-<subtaskIndex>-<partFileIndex>

Flink 允许用户通过 OutputFileConfig 指定部分文件名的前缀和后缀。举例来说,前缀设置为 “prefix” 以及后缀设置为 “.ext” 之后,Sink 创建的文件名如下所示:

└── 2019-08-25--12
├── prefix-0-0.ext
├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
├── prefix-1-0.ext
└── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

用户可以通过如下方式设置 OutputFileConfig:

复制代码

OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build();
StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
.forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build();

复制代码

三、代码实战

    下面就通过一个实例子分别来说明下StreamingFileSink各个特性的使用方法,请仔细阅读代码注释:

复制代码

package com.hadoop.ljs.flink110.sink;
import com.hadoop.ljs.flink110.source.CustomSource1;
import com.hadoop.ljs.flink110.source.StudentInfo;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.util.concurrent.TimeUnit;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-04-26 20:53
* @version: v1.0
* @description: com.hadoop.ljs.flink110.sink
*/
public class StreamingFileSinkTest {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME","hdfs");
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
senv.enableCheckpointing(10 * 1000);
/*指定source*/
DataStream<StudentInfo> source = senv.addSource(new CustomSource1()).setParallelism(1);
/*自定义滚动策略*/
DefaultRollingPolicy<StudentInfo, String> rollPolicy = DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(2))/*每隔多长时间生成一个文件*/
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))/*默认60秒,未写入数据处于不活跃状态超时会滚动新文件*/
.withMaxPartSize(128 * 1024 * 1024)/*设置每个文件的最大大小 ,默认是128M*/
.build();
/*输出文件的前、后缀配置*/
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();
StreamingFileSink<StudentInfo> streamingFileSink = StreamingFileSink
/*forRowFormat指定文件的跟目录与文件写入编码方式,这里使用SimpleStringEncoder 以UTF-8字符串编码方式写入文件*/
.forRowFormat(new Path("hdfs://192.168.0.101:8020/tmp/hdfsSink"), new SimpleStringEncoder<StudentInfo>("UTF-8"))
/*这里是采用默认的分桶策略DateTimeBucketAssigner,它基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH*/
.withBucketAssigner(new DateTimeBucketAssigner<>())
/*设置上面指定的滚动策略*/
.withRollingPolicy(rollPolicy)
/*桶检查间隔,这里设置为1s*/
.withBucketCheckInterval(1)
/*指定输出文件的前、后缀*/
.withOutputFileConfig(config)
.build();
/*指定sink*/
source.addSink(streamingFileSink);
/*启动执行*/
senv.execute("StreamingFileSinkTest");
}
}

复制代码

 

版权声明
本文为[wx5c7a97e3804fd]所创,转载请带上原文链接,感谢
https://blog.51cto.com/u_14222592/2894075

  1. Hadoop面试题(一)
  2. Hadoop面试题总结-HDFS
  3. Hadoop面试题总结-HDFS
  4. Hadoop面试题总结(三)- MapReduce
  5. Hadoop面试题总结(三)- MapReduce
  6. Hadoop面试题(四)- YARN
  7. Hadoop面试题(四)- YARN
  8. Hadoop面试题总结(五)- 优化
  9. Hadoop面试题总结(五)- 优化
  10. 大数据面试题之Hadoop系列(深入部分)
  11. 大数据面试题之Hadoop系列(深入部分)
  12. Java NIO之拥抱Path和Files
  13. 【Java Web开发指南】云服务器部署项目供外网访问(Tomcat)
  14. 2020 年九大顶级 Java 框架!别再用一些落后的技术了!
  15. 【大数据哔哔集20210108】Spark Shuffle 和 Hadoop Shuffle有什么异同?
  16. 【大数据哔哔集20210108】Spark Shuffle 和 Hadoop Shuffle有什么异同?
  17. 不建议Java程序员用阿里巴巴规范,而使用GoogleGuava编程的原因
  18. 【大数据面试之对线面试官】MapReduce/HDFS/YARN面试题70连击
  19. 【大数据面试之对线面试官】MapReduce/HDFS/YARN面试题70连击
  20. Netty源码解析-概述篇
  21. Netty源码解析-概述篇
  22. Netty源码解析1-Buffer
  23. Netty源码解析1-Buffer
  24. Netty源码解析2-Reactor
  25. Netty源码解析2-Reactor
  26. Netty源码解析3-Pipeline
  27. Netty源码解析3-Pipeline
  28. Netty源码解析4-Handler综述
  29. Netty源码解析4-Handler综述
  30. Netty源码解析5-ChannelHandler
  31. Netty源码解析5-ChannelHandler
  32. Netty源码解析6-ChannelHandler实例之LoggingHandler
  33. Netty源码解析6-ChannelHandler实例之LoggingHandler
  34. Netty源码解析7-ChannelHandler实例之TimeoutHandler
  35. Netty源码解析7-ChannelHandler实例之TimeoutHandler
  36. Netty源码解析8-ChannelHandler实例之CodecHandler
  37. Netty源码解析8-ChannelHandler实例之CodecHandler
  38. Netty源码解析9-ChannelHandler实例之MessageToByteEncoder
  39. Netty源码解析9-ChannelHandler实例之MessageToByteEncoder
  40. 大数据面试题之Hbase系列
  41. 你可能需要的Kafka面试题与答案整理
  42. 你可能需要的Kafka面试题与答案整理
  43. 后起之秀Pulsar VS. 传统强者Kafka?谁更强
  44. 后起之秀Pulsar VS. 传统强者Kafka?谁更强
  45. 【大数据哔哔集20210123】别问,问就是Kafka最可靠
  46. 【大数据哔哔集20210123】别问,问就是Kafka最可靠
  47. 【大数据哔哔集20210124】有人问我Kafka Leader选举?我真没慌
  48. 【大数据哔哔集20210124】有人问我Kafka Leader选举?我真没慌
  49. 【大数据哔哔集20210117】Kafka 的高可靠性是怎么实现的
  50. 【大数据哔哔集20210117】Kafka 的高可靠性是怎么实现的
  51. Kafka Connect | 无缝结合Kafka构建高效ETL方案
  52. Kafka面试题总结(一)
  53. Kafka面试题总结(一)
  54. Kafka面试题整理(二)
  55. Kafka面试题整理(二)
  56. 基于Kafka Flink Redis的电商大屏实时计算案例
  57. 基于Kafka Flink Redis的电商大屏实时计算案例
  58. Google布隆过滤器与Redis布隆过滤器详解
  59. Google布隆过滤器与Redis布隆过滤器详解
  60. The spring boot process executes a function (four solutions)