Flink实战(110):flink-sql使用(十八)connector(十九)Flink 与 hive 结合使用(七) Flink Hive Connector 使用

wx5c7a97e3804fd 2021-07-20 04:14:31
大数据技术 Flink学习


声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。

《2021年最新版大数据面试题全面开启更新》

Flink 1.12 版本

1. Hive 建表

复制代码

//1、创建 Hive 数据库
create database zhisheng;
//2、查看创建的数据库
show databases;
//3、使用创建的数据库
use zhisheng;
//4、在该库下创建 Hive 表
CREATE TABLE IF NOT EXISTS flink (
appid int,
message String
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
//5、往该表插入一条数据
insert into flink values(11111, '233sadadadwqqdq');

复制代码

2.Flink 读取 Hive 已经存在的表数据

复制代码

//1、创建 Hive CATALOG,Flink 通过 catalog 不仅可以将自己的表写入 Hive 的 metastore,也能读写 Hive 的表
CREATE CATALOG flinkHiveCatalog WITH (
'type' = 'hive',
'default-database' = 'zhisheng',
'hive-conf-dir' = '/app/apache-hive-2.1.1-bin/conf'
);
//2、使用该 Catalog
USE CATALOG flinkHiveCatalog;
//3、因为刚才已经写入了一条数据到 Hive 表(flink)
select * from flink;

复制代码

 

 

3.Flink 往 Hive 中已经存在的表写数据

复制代码

//1、创建 Source 表
CREATE TABLE yarn_log_datagen_test_hive_sink (
appid INT,
message STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.appid.kind'='random',
'fields.appid.min'='1',
'fields.appid.max'='1000',
'fields.message.length'='100'
);
//2、将数据写入到 Hive 表
insert into flink select * from yarn_log_datagen_test_hive_sink;

复制代码

 

 

 

 

//再次查询 Hive 表里面的数据
select * from flink;

 

 直接在 Hive 利用命令查询:

 

 

4 .完整 Example

复制代码

CREATE CATALOG flinkHiveCatalog WITH (
'type' = 'hive',
'default-database' = 'zhisheng',
'hive-conf-dir' = '/app/apache-hive-2.1.1-bin/conf'
);
USE CATALOG flinkHiveCatalog;
SET table.sql-dialect=hive; -- 创建 Hive 表要指定 sql-dialect 为 Hive,否则创建的时候识别不了下面的 DDL 语句
CREATE TABLE yarn_logs (
appid INT,
message STRING
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.parallelism'='2' -- 该参数内部才支持设置并行度
);
SET table.sql-dialect=default; -- 创建 Flink 表又要换回默认的 sql-dialect,Flink 支持在同一个 SQL 里面设置多个 sql-dialect
CREATE TABLE yarn_log_datagen_test (
appid INT,
message STRING,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.appid.kind' = 'random',
'fields.appid.min' = '1',
'fields.appid.max' = '1000',
'fields.message.length' = '100'
);
-- streaming sql, insert into hive table
INSERT INTO yarn_logs
SELECT appid, message, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM yarn_log_datagen_test;
-- batch sql, select with partition pruning
SELECT * FROM yarn_logs WHERE dt='2020-12-16' and hr='12';

复制代码

查看 table 的存储路径

show create table yarn_logs;

 

版权声明
本文为[wx5c7a97e3804fd]所创,转载请带上原文链接,感谢
https://blog.51cto.com/u_14222592/2894073

  1. Hadoop面试题(一)
  2. Hadoop面试题总结-HDFS
  3. Hadoop面试题总结-HDFS
  4. Hadoop面试题总结(三)- MapReduce
  5. Hadoop面试题总结(三)- MapReduce
  6. Hadoop面试题(四)- YARN
  7. Hadoop面试题(四)- YARN
  8. Hadoop面试题总结(五)- 优化
  9. Hadoop面试题总结(五)- 优化
  10. 大数据面试题之Hadoop系列(深入部分)
  11. 大数据面试题之Hadoop系列(深入部分)
  12. Java NIO之拥抱Path和Files
  13. 【Java Web开发指南】云服务器部署项目供外网访问(Tomcat)
  14. 2020 年九大顶级 Java 框架!别再用一些落后的技术了!
  15. 【大数据哔哔集20210108】Spark Shuffle 和 Hadoop Shuffle有什么异同?
  16. 【大数据哔哔集20210108】Spark Shuffle 和 Hadoop Shuffle有什么异同?
  17. 不建议Java程序员用阿里巴巴规范,而使用GoogleGuava编程的原因
  18. 【大数据面试之对线面试官】MapReduce/HDFS/YARN面试题70连击
  19. 【大数据面试之对线面试官】MapReduce/HDFS/YARN面试题70连击
  20. Netty源码解析-概述篇
  21. Netty源码解析-概述篇
  22. Netty源码解析1-Buffer
  23. Netty源码解析1-Buffer
  24. Netty源码解析2-Reactor
  25. Netty源码解析2-Reactor
  26. Netty源码解析3-Pipeline
  27. Netty源码解析3-Pipeline
  28. Netty源码解析4-Handler综述
  29. Netty源码解析4-Handler综述
  30. Netty源码解析5-ChannelHandler
  31. Netty源码解析5-ChannelHandler
  32. Netty源码解析6-ChannelHandler实例之LoggingHandler
  33. Netty源码解析6-ChannelHandler实例之LoggingHandler
  34. Netty源码解析7-ChannelHandler实例之TimeoutHandler
  35. Netty源码解析7-ChannelHandler实例之TimeoutHandler
  36. Netty源码解析8-ChannelHandler实例之CodecHandler
  37. Netty源码解析8-ChannelHandler实例之CodecHandler
  38. Netty源码解析9-ChannelHandler实例之MessageToByteEncoder
  39. Netty源码解析9-ChannelHandler实例之MessageToByteEncoder
  40. 大数据面试题之Hbase系列
  41. 你可能需要的Kafka面试题与答案整理
  42. 你可能需要的Kafka面试题与答案整理
  43. 后起之秀Pulsar VS. 传统强者Kafka?谁更强
  44. 后起之秀Pulsar VS. 传统强者Kafka?谁更强
  45. 【大数据哔哔集20210123】别问,问就是Kafka最可靠
  46. 【大数据哔哔集20210123】别问,问就是Kafka最可靠
  47. 【大数据哔哔集20210124】有人问我Kafka Leader选举?我真没慌
  48. 【大数据哔哔集20210124】有人问我Kafka Leader选举?我真没慌
  49. 【大数据哔哔集20210117】Kafka 的高可靠性是怎么实现的
  50. 【大数据哔哔集20210117】Kafka 的高可靠性是怎么实现的
  51. Kafka Connect | 无缝结合Kafka构建高效ETL方案
  52. Kafka面试题总结(一)
  53. Kafka面试题总结(一)
  54. Kafka面试题整理(二)
  55. Kafka面试题整理(二)
  56. 基于Kafka Flink Redis的电商大屏实时计算案例
  57. 基于Kafka Flink Redis的电商大屏实时计算案例
  58. Google布隆过滤器与Redis布隆过滤器详解
  59. Google布隆过滤器与Redis布隆过滤器详解
  60. The spring boot process executes a function (four solutions)