腾讯大牛教你ClickHouse实时同步MySQL数据

腾讯云数据库 2020-11-10 19:09:52
腾讯 clickhouse 教你 实时 大牛


作者 史鹏宙 CSIG云与智慧产业事业群研发工程师

ClickHouse作为OLAP分析引擎已经被广泛使用,数据的导入导出是用户面临的第一个问题。由于ClickHouse本身无法很好地支持单条大批量的写入,因此在实时同步数据方面需要借助其他服务协助。本文给出一种结合Canal+Kafka的方案,并且给出在多个MySQL实例分库分表的场景下,如何将多张MySQL数据表写入同一张ClickHouse表的方法,欢迎大家批评指正。

首先来看看我们的需求背景:

  1. 实时同步多个MySQL实例数据到ClickHouse,每天规模500G,记录数目亿级别,可以接受分钟级别的同步延迟;

  2. 某些数据库表存在分库分表的操作,用户需要跨MySQL实例跨数据库的表同步到ClickHouse的一张表中;

  3. 现有的MySQL binlog开源组件(Canal),无法做到多张源数据表到一张目的表的映射关系。

基本原理

一、使用JDBC方式同步

  1. 使用Canal组件完成binlog的解析和数据同步;

  2. Canal-Server进程会伪装成MySQL的slave,使用MySQL的binlog同步协议完成数据同步;

  3. Canal-Adapter进程负责从canal-server获取解析后的binlog,并且通过jdbc接口写入到ClickHouse;

image.png

优点:

  1. Canal组件原生支持;

缺点:

  1. Canal-Adpater写入时源表和目的表一一对应,灵活性不足;

  2. 需要维护两个Canal组件进程;

二、Kafka+ClickHouse物化视图方式同步

  1. Canal-Server完成binlog的解析,并且将解析后的json写入Kafka;

  2. Canal-Server可以根据正则表达式过滤数据库和表名,并且根据规则写入Kafka的topic;

  3. ClickHouse使用KafkaEngine和Materialized View完成消息消费,并写入本地表;

image.png

优点:

  1. Kafka支持水平扩展,可以根据数据规模调整partition数目;

  2. Kafka引入后将写入请求合并,防止ClickHouse生成大量的小文件,从而影响查询性能;

  3. Canal-Server支持规则过滤,可以灵活配置上游的MySQL实例的数据库名和表名,并且指明写入的Kafka topic名称;

缺点:

  1. 需要维护Kafka和配置规则;

  2. ClickHouse需要新建相关的视图、Kafka Engine的外表等;

具体步骤

一、准备工作

  1. 如果使用TencentDB,则在控制台确认binlog_format为ROW,无需多余操作。

image.png

如果是自建MySQL,则在客户端中查询变量:

> show variables like '%binlog%';
+-----------------------------------------+----------------------+
| Variable_name | Value |
+-----------------------------------------+----------------------+
| binlog_format | ROW |
+-----------------------------------------+----------------------+
> show variables like '%log_bin%';
+---------------------------------+--------------------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------------------+
| log_bin | ON |
| log_bin_basename | /data/mysql_root/log/20146/mysql-bin |
| log_bin_index | /data/mysql_root/log/20146/mysql-bin.index |
+---------------------------------+--------------------------------------------+
  1. 创建账号canal,用于同步binlog

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';

FLUSH PRIVILEGES;

二、Canal组件部署

前置条件:

Canal组件部署的机器需要跟ClickHouse服务和MySQL网络互通;

需要在机器上部署java8,配置JAVA_HOME、PATH等环境变量;

基本概念:

image.png

1. Canal-Server组件部署

Canal-Server的主要作用是订阅binlog信息并解析和定义instance相关信息,建议每个Canal-Server进程对应一个MySQL实例;

1)下载canal.deployer-1.1.4.tar.gz,解压

2)修改配置文件conf/canal.properties,需要关注的配置如下:

...
# 端口相关信息,如果同一台机器部署多个进程需要修改
canal.port = 11111
canal.metrics.pull.port = 11112
canal.admin.port = 11110
...
# 服务模式
canal.serverMode = tcp
...
# Kafka地址
canal.mq.servers = 172.21.48.11:9092
# 使用消息队列时 这两个值必须为true
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
...
# instance列表,conf目录下必须有同名的目录
canal.destinations = example,example2

3)配置instance

可以参照example新增新的instance,主要修改配置文件conf/${instance_name}/instance.properties文件。

样例1: 同步某个数据库的以XX前缀开头的表

订阅 172.21.48.35的MySQL的testdb数据库中的以tb_开头的表的数据变更(例如tb_20200801 、 tb_20200802等),主要的步骤如下:

步骤1:创建example2实例:cddeployer/conf && cp -r example example2

步骤2:修改deployer/conf/example2/instance.properties文件

...
# 上游MySQL实例地址
canal.instance.master.address=172.21.48.35:3306
...
# 同步账户信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# 过滤数据库名称和表名
canal.instance.filter.regex=testdb\\.tb_.*,

步骤3:在conf/canal.properties中修改 canal.destinations ,新增example2

样例2: 同步多个数据库的以XX前缀开头的表,且输出到Kafka

订阅 172.21.48.35的MySQL的empdb_0数据库的employees_20200801表,empdb_1数据库的employees_20200802表,并且数据写入Kafka;

步骤1:创建example2实例:cddeployer/conf && cp -r example example3

步骤2:修改deployer/conf/example3/instance.properties文件

...
# 上游MySQL实例地址
canal.instance.master.address=172.21.48.35:3306
...
# 同步账户信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# 过滤数据库名称和表名
canal.instance.filter.regex=empdb_.*\\.employees_.*
...
# Kafka的topic名称和匹配的规则
canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*
canal.mq.partition=0
# Kafka topic的分区数目(即partition数目)
canal.mq.partitionsNum=3
# 根据employees_开头的表中的 emp_no字段来进行数据hash,分布到不同的partition
canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

步骤3:在Kafka中新建topic employees_topic,指定分区数目为3

步骤4:在conf/canal.properties中修改 canal.destinations ,新增example3;修改服务模式为kafka,配置kafka相关信息;

# 服务模式
canal.serverMode = kafka
...
# Kafka地址
canal.mq.servers = 172.21.48.11:9092
# 使用消息队列时 这两个值必须为true
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
...
# instance列表,conf目录下必须有同名的目录
canal.destinations = example,example2,example3

2. Canal-Adapter组件部署(只针对方案一)

Canal-Adapter的主要作用是通过JDBC接口写入ClickHouse数据,可以配置多个表的写入;

1)下载canal.adapter-1.1.4.tar.gz,解压;

2)在lib目录下新增clickhouse驱动jar包及httpclient的jar包 httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

3)修改配置文件conf/application.yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置;

server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HHss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp
canalServerHost: 127.0.0.1:11111 # canal-server的服务地址
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
# MySQL的配置,修改用户名密码及制定数据库
srcDataSources:
defaultDS:
url: jdbc:mysql://172.21.48.35:3306
username: root
password: yourpasswordhere
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
# clickhouse的配置,修改用户名密码数据库
properties:
jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
jdbc.url: jdbc:clickhouse://172.21.48.18:8123
jdbc.username: default
jdbc.password:

4)修改配置文件conf/rdb/mytest_user.yml文件

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: testdb
mirrorDb: true

上述的配置文件中,由于开启了mirrorDb: true,目的端的ClickHouse必须有相同的数据库名和表名。

样例1:源数据库与目标数据库名字不同,源表名与目标表名不同

修改adapter的conf/rdb/mytest_user.yml配置文件,指定源数据库和目标数据库

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: source_database_name
table: source_table
targetTable: destination_database_name.destination_table
targetColumns:
id:
name:
commitBatch: 3000 # 批量提交的大小

样例2:多个源数据库表写入目的端的同一张表

在conf/rdb 目录配置多个yml文件,分别指明不同的table名称。

Kafka 服务配置

一、调整合理的producer参数

确认Canal-Server里的canal.properties文件,重要参数见下表;

image.png

二、新建相关的topic名称

根据Canal-Server里instance里配置文件instance.properties,注意分区数目与canal.mq.partitionsNum 保持一致;

partition数目需要考虑以下因素:

  1. 上游的MySQL的数据量。原则上数据写入量越大,应该分配更多的partition数目;

  2. 考虑下游ClickHouse的实例数目。topic的partition分区总数 最好 不大于 下游ClickHouse的总实例数目,保证每个ClickHouse实例都能至少分配到一个partition;

ClickHouse服务配置

根据上游MySQL实例的表的schema新建数据表;

引入Kafka时需要额外新建Engine=Kafka的外表以及相关的物化视图表;

建议:

  1. 为每个外表新增不同的 kafka_group_name,防止相互影响;

  2. 设置kafka_skip_broken_messages 参数为合理值,遇到无法解析数据会跳过;

  3. 设置合理的kafka_num_consumers值,最好保证所有ClickHouse实例该值的总和大于 topic的partition数目;

新建相关的分布式查询表;

服务启动

启动相关的Canal组件进程;

  1. canal-server: sh bin/startup.sh

  2. canal-adapter: sh bin/startup.sh

在MySQL中插入数据,观察日志是否可以正常运行;

如果使用Kafka,可以通过kafka-console-consumer.sh脚本观察binlog数据解析;

观察ClickHouse数据表中是否正常写入数据;

实际案例

需求:实时同步MySQL实例的empdb_0.employees_20200801表和empdb_1.employees_20200802数据表

方案:使用方案二

环境及参数:

MySQL地址 172.21.48.35:3306
CKafka地址 172.21.48.11:9092
Canal instance名称 employees
Kafka目的topic employees_topic

1.在MySQL新建相关表

# MySQL表的建表语句
CREATE DATABASE `empdb_0`;
CREATE DATABASE `empdb_1`;
CREATE TABLE `empdb_0`.`employees_20200801` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
);
CREATE TABLE `empdb_1`.`employees_20200802` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
);

2. Canal-Server配置

步骤1. 修改conf/canal.properties文件

canal.serverMode = kafka
...
canal.destinations = example,employees
...
canal.mq.servers = 172.21.48.11:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
canal.mq.compressionType = none
canal.mq.acks = all
canal.mq.producerGroup = cdbproducer
canal.mq.accessChannel = local
...

步骤2. 新增employees实例,修改employees/instances.properties配置

...
canal.instance.master.address=172.21.48.35:3306
...
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
canal.instance.filter.regex=empdb_.*\\.employees_.*
...
canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*
canal.mq.partition=0
canal.mq.partitionsNum=3
canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

3. Kafka配置

4. 新增topic employees_topic,分区数为3

5. ClickHouse建表

CREATE DATABASE testckdb ON CLUSTER default_cluster;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees ON CLUSTER default_cluster (
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) ENGINE=MergeTree() ORDER BY (emp_no)
SETTINGS index_granularity = 8192;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees_stream ON CLUSTER default_cluster (
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = '172.21.48.11:9092',
kafka_topic_list = 'employees_topic',
kafka_group_name = 'employees_group',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 1024,
kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW IF NOT EXISTS testckdb.ck_employees_mv ON CLUSTER default_cluster TO testckdb.ck_employees(
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) AS SELECT
`emp_no`,
`birth_date`,
`first_name`,
`last_name`,
`gender`,
`hire_date`
FROM
testckdb.ck_employees_stream;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees_dis ON CLUSTER default_cluster AS testckdb.ck_employees
ENGINE=Distributed(default_cluster, testckdb, ck_employees);

6. 启动Canal-Server服务

MySQL实例上游插入数据,观察数据是否在Canal-Server解析正常,是否在ClickHouse中完成同步。

本文由博客群发一文多发等运营工具平台 OpenWrite 发布

版权声明
本文为[腾讯云数据库]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4788009/blog/4711488

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云