Canal: synchronous MySQL incremental data tool, a detailed explanation of the core knowledge

Hard working Lao Liu 2021-01-22 12:11:53
canal synchronous mysql incremental data


Lao Liu is a sophomore who is about to find a job , On the one hand, blogging is to summarize the knowledge of big data development , On the one hand, I hope that I can help my partners, so that self-study never asks for others . Because Lao Liu taught himself big data development , There must be some shortcomings in blogs , I also hope you can criticize and correct , Let's make progress together !

background

Data sources in big data field include data from business database , There are also mobile end buried point data 、 Log data generated by the server . We collect data according to different requirements of downstream data , We can use different collection tools to do . What Lao Liu tells you today is synchronization mysql Tools for incremental data Canal, The outline of this article is as follows :

  1. Canal The concept of
  2. mysql The implementation principle of active and standby replication in
  3. Canal How to go from MySQL Data synchronization in
  4. Canal Of HA Mechanism design
  5. A brief summary of various data synchronization solutions

Lao Liu tries to use this article to let everyone get started Canal This tool , Don't spend any more time studying .

mysql The principle of primary and standby replication

because Canal It's used to synchronize mysql Incremental data in , So Lao Liu said first mysql The principle of master-slave replication , Later on Canal The core knowledge of .

According to this picture , Lao Liu Ba mysql The principle of active and standby replication is divided into the following process :

  1. The primary server must first start the binary log binlog, Used to record any events that modify the database data .
  2. The master server records the data changes to binary binlog journal .
  3. The slave server copies the master's binary log to its local relay log (Relaylog) in . This step is to start a worker thread from the server I/O Threads ,I/O The thread will establish a common single client connection with the main library , Then start a special binary dump on the main server (binlog dump) Threads , This binlog dump The thread reads the events in the binary log on the primary server , And then to I/O Thread sends binary events , And save it to the relay log on the slave server .
  4. Start from server SQL Threads , Read binary log from relay log , And the slave server will perform another data modification operation , So as to update the data from the server .

that mysql This is the end of the implementation principle of active and standby replication , Let's go through the process , Can you guess Canal How it works ?

Canal Core knowledge points

Canal How it works

Canal It works by simulating MySQL slave Interaction protocol , Disguise yourself as MySQL slave, towards MySQL master Launch dump agreement .MySQL master received dump After the request , It's going to start pushing binlog to Canal. Last Canal It will parse binlog object .

Canal Concept

Canal, beautiful [kəˈnæl], It's read like this , It means waterways / The Conduit / channel , The main purpose is to synchronize MySQL Incremental data in ( It can be understood as real-time data ), It's a pure Java Open source project developed .

Canal framework

server Representing one canal Operation example , Corresponds to a JVM. instance Corresponds to a data queue ,1 individual canal server Corresponding 1..n individual instance instance Sub module under :

  1. EventParser: Data source access , simulation salve The protocol and master Interact , Protocol analysis
  2. EventSink:Parser and Store The linker , Data filtering , machining , Distribution work
  3. EventStore: data storage
  4. MetaManager: Incremental subscription & Consumer information manager

Up to now Canal That's all , Then I'm going to talk about Canal How to synchronize mysql Incremental data for .

Canal Sync MySQL Incremental data

Turn on mysql binlog

We use it Canal Sync mysql The premise of incremental data is mysql Of binlog Is open , Ali cloud, mysql The database is on by default binlog Of , But if we install it ourselves mysql Manual opening required binlog Log function .

First find mysql Configuration file for :

etc/my.cnf

server-id=1
log-bin=mysql-bin
binlog-format=ROW

Here's a piece of knowledge about binlog The format of , Lao Liu tells you .

binlog There are three formats for :STATEMENT、ROW、MIXED

  1. ROW Pattern ( It's usually used )

    The log records how each line of data has been modified , No record of execution SQL Statement context sensitive information , Only record the data to be modified , Which data has been modified , What did it look like , Only value, There will be no SQL In the case of multi table Association .

    advantage : It just needs to record which data has been modified , What did it look like , So its log content will record the details of each line of data modification very clearly , Very easy to understand .

    shortcoming :ROW In mode , Especially in the case of data addition , All executed statements are logged , Will be recorded with changes in each row of records , This will generate a lot of log content .

  2. STATEMENT Pattern

    Each one will modify the data SQL Statements are recorded .

    shortcoming : Because it is the execution statement of the record , therefore , In order to make these statements slave The end can also execute correctly , Then he must also record some relevant information during the execution of each statement , That is context information , To ensure that all statements slave When the end is executed, it can get and master The same result when the end is executed .

    But for now, for example step() Functions cannot be copied correctly in some versions , Used in stored procedures last-insert-id() function , May make slave and master Get inconsistent on id, There will be data inconsistency ,ROW In mode, there is no .

  3. MIXED Pattern

    Both models use .

Canal Real time synchronization

  1. First we need to configure the environment , stay conf/example/instance.properties Next :
 ## mysql serverId
 canal.instance.mysql.slaveId = 1234
 #position info, It needs to be modified into its own database information
 canal.instance.master.address = 127.0.0.1:3306
 canal.instance.master.journal.name =
 canal.instance.master.position =
 canal.instance.master.timestamp =
 #canal.instance.standby.address =
 #canal.instance.standby.journal.name =
 #canal.instance.standby.position =
 #canal.instance.standby.timestamp =
 #username/password, It needs to be modified into its own database information
 canal.instance.dbUsername = canal
 canal.instance.dbPassword = canal
 canal.instance.defaultDatabaseName =
 canal.instance.connectionCharset = UTF-8
 #table regex
 canal.instance.filter.regex = .\*\\\\..\*

among ,canal.instance.connectionCharset The coding method representing the database corresponds to java Code type in , such as UTF-8,GBK,ISO-8859-1.

  1. After the configuration , It's about to start
 sh bin/startup.sh
  Turn off use  bin/stop.sh
  1. Watch the log

    In general use cat see canal/canal.log、example/example.log

  2. Start client

    stay IDEA Business code in ,mysql If there is incremental data in the database, pull it over , stay IDEA The console prints out

    stay pom.xml Add... To the file :

 <dependency>
   <groupId>com.alibaba.otter</groupId>
   <artifactId>canal.client</artifactId>
   <version>1.0.12</version>
 </dependency>

Add client code :

public class Demo {
 public static void main(String[] args) {
     // Create connection
     CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
             "example""""");
     connector.connect();
     // subscribe
     connector.subscribe();
     connector.rollback();
     int batchSize = 1000;
     int emptyCount = 0;
     int totalEmptyCount = 100;
     while (totalEmptyCount > emptyCount) {
         Message msg = connector.getWithoutAck(batchSize);
         long id = msg.getId();
         List<CanalEntry.Entry> entries = msg.getEntries();
         if(id == -1 || entries.size() == 0){
             emptyCount++;
             System.out.println("emptyCount : " + emptyCount);
             try {
                 Thread.sleep(3000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }else{
             emptyCount = 0;
             printEntry(entries);
         }
         connector.ack(id);
     }
 }
 // batch -> entries -> rowchange - rowdata -> cols
 private static void printEntry(List<CanalEntry.Entry> entries) {
     for (CanalEntry.Entry entry : entries){
         if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                 entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
             continue;
         }
         CanalEntry.RowChange rowChange = null;
         try {
             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
         } catch (InvalidProtocolBufferException e) {
             e.printStackTrace();
         }
         CanalEntry.EventType eventType = rowChange.getEventType();
         System.out.println(entry.getHeader().getLogfileName()+" __ " +
                 entry.getHeader().getSchemaName() + " __ " + eventType);
         List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
         for(CanalEntry.RowData rowData : rowDatasList){
             for(CanalEntry.Column column: rowData.getAfterColumnsList()){
                 System.out.println(column.getName() + " - " +
                         column.getValue() + " - " +
                         column.getUpdated());
             }
         }
     }
 }
}
  1. stay mysql Write data in , The client will print the incremental data to the console .

Canal Of HA Mechanism design

In the field of big data, many frameworks will have HA Mechanism ,Canal Of HA In two parts ,Canal server and Canal client There are corresponding HA Realization :

  1. canal server: In order to reduce the mysql dump Request , Different server Upper instance You can only be at the same time running, The others are in standby state .
  2. canal client: In order to ensure order , One copy instance Only one at a time canal client Conduct get/ack/rollback operation , Otherwise, the order of client receiving cannot be guaranteed .

Whole HA The control mechanism mainly depends on ZooKeeper Several characteristics of ,ZooKeeper I won't talk about it here .

Canal Server:

  1. canal server To start something canal instance Always go first to ZooKeeper Make an attempt to start the judgment ( establish EPHEMERAL node , Whoever creates successfully will be allowed to start ).
  2. establish ZooKeeper After node success , Corresponding canal server Start the corresponding canal instance, No successful canal instance Will be in standby state .
  3. once ZooKeeper Find out canal server After the created node disappears , Inform the others immediately canal server Step again 1 The operation of , Choose a new one canal server start-up instance.
  4. canal client Every time connect when , First of all ZooKeeper Ask who is currently starting canal instance, And then connect to it , Once the connection is not available , Will try again connect.
  5. canal client And canal server Similar way , It's also the use of ZooKeeper To seize EPHEMERAL The way nodes control .

Canal HA Configuration of , And synchronize the data in real time to kafka in .

  1. modify conf/canal.properties file
 canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181
 canal.serverMode = kafka
 canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092
  1. To configure conf/example/example.instance
  canal.instance.mysql.slaveId = 790 / Two sets of canal server Of slaveID only 
  canal.mq.topic = canal_log // Specifies that data is sent to the kafka Of topic

Summary of data synchronization scheme

Finished. Canal Tools , Now give you a brief summary of the current common data collection tools , No architecture knowledge , It's just a summary , Let's make an impression .

Common data acquisition tools are :DataX、Flume、Canal、Sqoop、LogStash etc. .

DataX ( Processing offline data )

DataX Alibaba is an open source heterogeneous data source offline synchronization tool , Offline synchronization of heterogeneous data sources refers to the synchronization of data from the source to the destination , But there are many types of data sources from end to end , In the absence of DataX Before , End to end links will form a complex network structure , Very fragmented, unable to abstract synchronization core logic .

In order to solve the problem of heterogeneous data source synchronization ,DataX Turn the complex mesh synchronization link into a star data link ,DataX As an intermediate transmission carrier, it is responsible for connecting various data sources .

therefore , When you need to access a new data source , Just connect this data source pair to DataX, You can synchronize with existing data sources seamlessly .

DataX As an offline data synchronization framework , use Framework+plugin Architecture building . Abstract data source read and write as Reader/Writer plug-in unit , Integrated into the entire synchronization framework .

  1. Reader: It is a data acquisition module , Responsible for collecting data from data sources , Send the data to Framework.
  2. Writer: It's a data write module , To be responsible for keeping up with Framework Take the data , And write the data to the destination .
  3. Framework: It is used to connect Reader and Writer, As a data transmission channel for both , And handle buffers 、 Concurrent 、 Data conversion and so on .

DataX The core architecture is shown in the figure below :

Core module introduction :

  1. DataX Complete a single data synchronization job , We call it Job,DataX Received a Job after , A process will be started to complete the job synchronization process .
  2. DataX Job After starting , According to different source segmentation strategies , take Job Cut into smaller ones Task( The subtasks ), To facilitate concurrent execution .
  3. Cut into several Task after ,DataX Job Would call Scheduler modular , According to the amount of concurrent data configured , To divide into Task Back together , Assemble into TaskGroup( Task force ). every last TaskGroup Responsible for running all the assigned tasks in a certain concurrent way Task, The default number of concurrent tasks in a single task group is 5.
  4. every last Task All by TaskGroup Responsible for starting ,Task After starting , It will start in a fixed way Reader->Channel->Writer To complete task synchronization .
  5. DataX After the job is finished ,Job Monitor and wait for multiple TaskGroup Module task complete , Wait for all TaskGroup When the task is completed Job Quit successfully . otherwise , Abnormal exit .

Flume( Processing real-time data )

Flume The main application scenario is to synchronize log data , There are three main components :Source、Channel、Sink.

Flume The biggest advantage is that the official website provides a wealth of Source、Channel、Sink, According to different business needs , We can find the relevant configuration on the official website . in addition ,Flume It also provides interfaces to customize these components .

Logstash( Processing offline data )

Logstash It's a pipeline with real-time data transmission capability , Responsible for transmitting data information from the input end of the pipeline to the output end of the pipeline ; At the same time, this pipe allows you to add a filter in the middle according to your own needs ,Logstash Many powerful filters are provided to meet various application scenarios .

Logstash By JRuby To write , Using a simple message based architecture , stay JVM Up operation . The data flow in the pipeline is called event, It is divided into inputs Stage 、filters Stage 、outputs Stage .

Sqoop( Processing offline data )

Sqoop yes Hadoop A tool for transferring data between database and relational database , It is used to extract data from relational databases such as MySQL To Hadoop Of HDFS from Hadoop The file system exports data to a relational database .Sqoop The bottom layer is still MapReducer, Be sure to pay attention to data skew when using it .

summary

Lao Liu's article mainly talks about Canal The core knowledge of the tool and the comparison of its data acquisition tools , Among them, the data acquisition tool only talks about the concept and application , The purpose is to give you an impression . Lao Liu dares to make a promise that reading this article is basically equivalent to getting started , The rest is practice .

All right. , Sync mysql Tools for incremental data Canal That's all , Although the current level may not be as good as you guys , But Liu will try to be better , Let you learn by yourself and never ask for help !

If there are related questions , Contact the official account : Hard working old Liu . The article all saw this , Like, follow and support a wave of !

版权声明
本文为[Hard working Lao Liu]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210122113109008x.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云