Tencent Daniel teaches you click house to synchronize MySQL data in real time

Tencent cloud database 2020-11-10 19:09:52
tencent daniel teaches click house


author Shi Pengzhou CSIG R & D Engineer of cloud and smart industry business group

ClickHouse As OLAP Analysis engines have been widely used , Data import and export is the first problem faced by users . because ClickHouse It can't support a large number of single writes , Therefore, we need other services to help us synchronize data in real time . This paper gives a combination of Canal+Kafka The plan , And given in multiple MySQL In the case of instance sub database and sub table , How to put more than one MySQL The data table is written to the same sheet ClickHouse The method of table , Welcome criticism .

Let's first look at the context of our needs :

  1. Real time synchronization of multiple MySQL Instance data to ClickHouse, Daily scale 500G, The number of records is billion levels , The synchronization delay of minutes is acceptable ;

  2. Some database tables have the operation of sub database and sub table , Users need to cross MySQL Instance synchronizes tables across databases to ClickHouse In a table of ;

  3. The existing MySQL binlog Open source components (Canal), It is impossible to map multiple source data tables to one destination table .

The basic principle

One 、 Use JDBC Mode synchronization

  1. Use Canal Component complete binlog Analysis and data synchronization of ;

  2. Canal-Server The process will be disguised as MySQL Of slave, Use MySQL Of binlog Synchronization protocol completes data synchronization ;

  3. Canal-Adapter The process is responsible for starting from canal-server Get parsed binlog, And through jdbc Interface is written to ClickHouse;

image.png

advantage :

  1. Canal Component native support ;

shortcoming :

  1. Canal-Adpater When writing, the source table corresponds to the destination table one by one , Lack of flexibility ;

  2. Need to maintain two Canal Component process ;

Two 、Kafka+ClickHouse Materialized view mode synchronization

  1. Canal-Server complete binlog Parsing , And will parse the json write in Kafka;

  2. Canal-Server You can filter database and table names based on regular expressions , And write... According to the rules Kafka Of topic;

  3. ClickHouse Use KafkaEngine and Materialized View Complete message consumption , And write to the local table ;

image.png

advantage :

  1. Kafka Support horizontal scaling , It can be adjusted according to the size of the data partition number ;

  2. Kafka Merge write requests after introduction , prevent ClickHouse Generate a lot of small files , This will affect query performance ;

  3. Canal-Server Support rule filtering , You can flexibly configure the upstream MySQL The database name and table name of the instance , And indicate what is written Kafka topic name ;

shortcoming :

  1. Need to maintain Kafka And configuration rules ;

  2. ClickHouse You need to create a new related view 、Kafka Engine And so on ;

Specific steps

One 、 preparation

  1. If you use TencentDB, Confirm at the console binlog_format by ROW, No unnecessary operation is required .

image.png

If it's self built MySQL, Query variables in the client :

> show variables like '%binlog%';
+-----------------------------------------+----------------------+
| Variable_name | Value |
+-----------------------------------------+----------------------+
| binlog_format | ROW |
+-----------------------------------------+----------------------+
> show variables like '%log_bin%';
+---------------------------------+--------------------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------------------+
| log_bin | ON |
| log_bin_basename | /data/mysql_root/log/20146/mysql-bin |
| log_bin_index | /data/mysql_root/log/20146/mysql-bin.index |
+---------------------------------+--------------------------------------------+
  1. Create account canal, For synchronization binlog

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';

FLUSH PRIVILEGES;

Two 、Canal Component deployment

precondition :

Canal The machine for component deployment needs to follow ClickHouse Service and MySQL Inter-switch communication ;

It needs to be deployed on the machine java8, To configure JAVA_HOME、PATH And other environmental variables ;

Basic concepts :

image.png

1. Canal-Server Component deployment

Canal-Server Its main function is to subscribe to binlog Information and parsing and defining instance Related information , Suggest that each Canal-Server The process corresponds to a MySQL example ;

1) download canal.deployer-1.1.4.tar.gz, decompression

2) Modify the configuration file conf/canal.properties, The configuration that needs attention is as follows :

...
# Port related information , If multiple processes are deployed on the same machine, they need to be modified 
canal.port = 11111
canal.metrics.pull.port = 11112
canal.admin.port = 11110
...
# Service mode 
canal.serverMode = tcp
...
# Kafka Address 
canal.mq.servers = 172.21.48.11:9092
# When using message queuing These two values must be true
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
...
# instance list ,conf There must be a directory with the same name 
canal.destinations = example,example2

3) To configure instance

You can refer to example Add new instance, Mainly modify the configuration file conf/${instance_name}/instance.properties file .

Examples 1: To synchronize a database with XX The table at the beginning of the prefix

subscribe 172.21.48.35 Of MySQL Of testdb In the database tb_ Data changes in the beginning table ( for example tb_20200801 、 tb_20200802 etc. ), The main steps are as follows :

step 1: establish example2 example :cddeployer/conf && cp -r example example2

step 2: modify deployer/conf/example2/instance.properties file

...
# The upstream MySQL Address of the instance 
canal.instance.master.address=172.21.48.35:3306
...
# Synchronize account information 
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# Filter database name and table name 
canal.instance.filter.regex=testdb\\.tb_.*,

step 3: stay conf/canal.properties Revision in China canal.destinations , newly added example2

Examples 2: Synchronize multiple databases to XX The table at the beginning of the prefix , And output to Kafka

subscribe 172.21.48.35 Of MySQL Of empdb_0 Database employees_20200801 surface ,empdb_1 Database employees_20200802 surface , And the data is written to Kafka;

step 1: establish example2 example :cddeployer/conf && cp -r example example3

step 2: modify deployer/conf/example3/instance.properties file

...
# The upstream MySQL Address of the instance 
canal.instance.master.address=172.21.48.35:3306
...
# Synchronize account information 
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# Filter database name and table name 
canal.instance.filter.regex=empdb_.*\\.employees_.*
...
# Kafka Of topic Names and matching rules 
canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*
canal.mq.partition=0
# Kafka topic Number of partitions for ( namely partition number )
canal.mq.partitionsNum=3
# according to employees_ At the beginning of the table emp_no Field for data hash, It's distributed in different ways partition
canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

step 3: stay Kafka New China topic employees_topic, Specify the number of partitions as 3

step 4: stay conf/canal.properties Revision in China canal.destinations , newly added example3; Change the service mode to kafka, To configure kafka Related information ;

# Service mode 
canal.serverMode = kafka
...
# Kafka Address 
canal.mq.servers = 172.21.48.11:9092
# When using message queuing These two values must be true
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
...
# instance list ,conf There must be a directory with the same name 
canal.destinations = example,example2,example3

2. Canal-Adapter Component deployment ( Only plan one )

Canal-Adapter The main role of JDBC Interface write ClickHouse data , You can configure the writing of multiple tables ;

1) download canal.adapter-1.1.4.tar.gz, decompression ;

2) stay lib New under directory clickhouse drive jar Package and httpclient Of jar package httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

3) Modify the configuration file conf/application.yml file , modify canalServerHost、srcDataSources、canalAdapters Configuration of ;

server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HHss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp
canalServerHost: 127.0.0.1:11111 # canal-server Service address of 
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
# MySQL Configuration of , Modify user name, password and make database 
srcDataSources:
defaultDS:
url: jdbc:mysql://172.21.48.35:3306
username: root
password: yourpasswordhere
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
# clickhouse Configuration of , Modify user name password database 
properties:
jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
jdbc.url: jdbc:clickhouse://172.21.48.18:8123
jdbc.username: default
jdbc.password:

4) Modify the configuration file conf/rdb/mytest_user.yml file

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: testdb
mirrorDb: true

In the above configuration file , Because it's on mirrorDb: true, The end ClickHouse Must have the same name as the database .

Examples 1: The name of the source database is different from that of the target database , The source table name is different from the target table name

modify adapter Of conf/rdb/mytest_user.yml The configuration file , Specify the source and target databases

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: source_database_name
table: source_table
targetTable: destination_database_name.destination_table
targetColumns:
id:
name:
commitBatch: 3000 # The size of the batch submission 

Examples 2: Multiple source database tables are written to the same table on the destination side

stay conf/rdb Directory configuration multiple yml file , Point out the different table name .

Kafka Service configuration

One 、 Adjust reasonably producer Parameters

confirm Canal-Server Inside canal.properties file , The important parameters are shown in the table below ;

image.png

Two 、 New related topic name

according to Canal-Server in instance In the configuration file instance.properties, Pay attention to the number of partitions and canal.mq.partitionsNum bring into correspondence with ;

partition The number needs to take into account the following factors :

  1. Upstream MySQL The amount of data . In principle, the more data is written , More should be allocated partition number ;

  2. Consider downstream ClickHouse Number of instances .topic Of partition Total number of divisions best No more than The downstream ClickHouse The total number of instances of , Make sure that each ClickHouse Instances can be assigned to at least one partition;

ClickHouse Service configuration

According to the upstream MySQL Instance table schema New data table ;

introduce Kafka You need to build a new one Engine=Kafka The appearance and related materialization chart of the ;

Suggest :

  1. Add a different kafka_group_name, Prevent interaction ;

  2. Set up kafka_skip_broken_messages The parameter is a reasonable value , If the data cannot be parsed, it will skip ;

  3. Reasonable setting kafka_num_consumers value , It's better to make sure that all ClickHouse Instance the sum of the values is greater than topic Of partition number ;

Create a related distributed query table ;

Service startup

Start related Canal Component process ;

  1. canal-server: sh bin/startup.sh

  2. canal-adapter: sh bin/startup.sh

stay MySQL Insert data , Observe whether the log works properly ;

If you use Kafka, Can pass kafka-console-consumer.sh Script observation binlog Data analysis ;

Observe ClickHouse Whether the data is normally written in the data table ;

Actual case

demand : Real time synchronization MySQL Example of empdb_0.employees_20200801 Table and empdb_1.employees_20200802 Data sheet

programme : Use scheme two

Environment and parameters :

MySQL Address 172.21.48.35:3306
CKafka Address 172.21.48.11:9092
Canal instance name employees
Kafka Purpose topic employees_topic

1. stay MySQL New related table

# MySQL Table creation statement 
CREATE DATABASE `empdb_0`;
CREATE DATABASE `empdb_1`;
CREATE TABLE `empdb_0`.`employees_20200801` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
);
CREATE TABLE `empdb_1`.`employees_20200802` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
);

2. Canal-Server To configure

step 1. modify conf/canal.properties file

canal.serverMode = kafka
...
canal.destinations = example,employees
...
canal.mq.servers = 172.21.48.11:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
canal.mq.compressionType = none
canal.mq.acks = all
canal.mq.producerGroup = cdbproducer
canal.mq.accessChannel = local
...

step 2. newly added employees example , modify employees/instances.properties To configure

...
canal.instance.master.address=172.21.48.35:3306
...
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
canal.instance.filter.regex=empdb_.*\\.employees_.*
...
canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*
canal.mq.partition=0
canal.mq.partitionsNum=3
canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

3. Kafka To configure

4. newly added topic employees_topic, The number of partitions is 3

5. ClickHouse Build table

CREATE DATABASE testckdb ON CLUSTER default_cluster;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees ON CLUSTER default_cluster (
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) ENGINE=MergeTree() ORDER BY (emp_no)
SETTINGS index_granularity = 8192;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees_stream ON CLUSTER default_cluster (
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = '172.21.48.11:9092',
kafka_topic_list = 'employees_topic',
kafka_group_name = 'employees_group',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 1024,
kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW IF NOT EXISTS testckdb.ck_employees_mv ON CLUSTER default_cluster TO testckdb.ck_employees(
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) AS SELECT
`emp_no`,
`birth_date`,
`first_name`,
`last_name`,
`gender`,
`hire_date`
FROM
testckdb.ck_employees_stream;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees_dis ON CLUSTER default_cluster AS testckdb.ck_employees
ENGINE=Distributed(default_cluster, testckdb, ck_employees);

6. start-up Canal-Server service

MySQL Instance upstream inserts data , Observe whether the data is in Canal-Server Parsing is normal , Whether in ClickHouse Complete synchronization in .

This article is a platform of operation tools such as blog group sending one article and multiple sending OpenWrite Release

版权声明
本文为[Tencent cloud database]所创,转载请带上原文链接,感谢

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云