Canal: synchronous MySQL incremental data tool, a detailed explanation of the core knowledge

itread01 2021-01-22 16:19:20
canal synchronous mysql incremental data


Lao Liu is a sophomore who is about to find a job , On the one hand, blogging is to summarize the knowledge of big data development , On the one hand, I hope that I can help my partners, so that self-study never asks for others . Because Lao Liu is a self-taught big data developer , There are bound to be some shortcomings in blogs , I also hope you can criticize and correct , Let's make progress together !

background

The data sources of big data field are the data of business database , There are also mobile terminal buried point data 、 Log data generated by the server . When we collect data, we need different data according to the downstream requirements , We can use different collection tools to do . What Lao Liu tells you today is synchronization mysql Tools for incremental data Canal, The outline of this article is as follows :

  1. Canal The concept of
  2. mysql The implementation principle of active and standby replication in
  3. Canal How to get from MySQL Synchronization data in
  4. Canal Of HA Mechanism design
  5. A brief summary of various data synchronization solutions

Lao Liu tries to use this article to let everyone get started Canal This tool , Don't spend any more time studying .

mysql The principle of primary and standby replication

Because of Canal It's used to synchronize mysql In this paper, we introduce the concept of incremental data , So Lao Liu said first mysql The principle of active and standby replication , I'll talk about it later Canal The core knowledge of .

According to this picture , Lao Liu Ba mysql The principle of active and standby replication is divided into the following process :

  1. The master server must first start binary logging binlog, Used to record any events that modify database data .
  2. The master server logs changes to binary system binlog The Journal .
  3. The slave server copies the binary log of the master server to its local relay log (Relaylog) in . This step is to start a work thread from the server I/O Thread ,I/O The thread will establish a normal single client connection with the main library , Then start a special binary dump on the master server (binlog dump) Thread , This binlog dump The thread reads the events in the binary log on the host server , And then to I/O The thread passes binary events , And stored in the relay log on the slave server .
  4. Boot from server SQL Thread , Read binary log from relay log , And a data modification operation will be performed locally on the slave server , So as to update the data from the server .

So mysql This is the end of the implementation principle of active and standby replication , Let's go through the process , Can you guess Canal How it works ?

Canal Core knowledge points

Canal How it works

Canal It works by simulating MySQL slave The interaction agreement , Disguise yourself as MySQL slave, towards MySQL master Launch dump Agreement .MySQL master received dump After the request , It's going to start pushing binlog to Canal. Finally Canal I'll analyze binlog thing .

Canal Concept

Canal, beautiful [kəˈnæl], It's read like this , It means waterways / The Conduit / channel , The main purpose is to synchronize MySQL Incremental data in ( It can be understood as real-time data ), It's a pure Java Open source projects developed .

Canal Architecture

server On behalf of a canal Example items , Corresponds to a JVM.instance Corresponding to a data queue ,1 One canal server Correspondence 1..n One instanceinstance The submodules under the :

  1. EventParser: Data source access , Simulation salve Agreement and master Interact , Protocol resolution
  2. EventSink:Parser and Store Connector , Filter the data , machining , Distribution work
  3. EventStore: Data storage
  4. MetaManager: Incremental subscription & Consumer information manager

Until now Canal That's all , Then I'm going to talk about Canal How to synchronize mysql Incremental data of .

Canal Sync MySQL Incremental data

Turn on mysql binlog

We use Canal Sync mysql The premise of incremental data is mysql Of binlog It's on , Alicloud's mysql The database is on by default binlog Of , But if we install it ourselves mysql It needs to be opened manually binlog Log function .

First find mysql Configuration file of :

etc/my.cnf

server-id=1
log-bin=mysql-bin
binlog-format=ROW

Here is a knowledge point about binlog The format of , Lao Liu tells you .

binlog There are three formats of :STATEMENT、ROW、MIXED

  1. ROW Pattern ( It's usually used )

    The log records how each line of data has been modified , No record of execution SQL Context sensitive information about a statement , Only record the data to be modified , Which data has been modified , What did it look like , Only value, There won't be SQL In the case of multi table Association .

    Advantages : It just needs to record which data has been modified , What did it look like , So its log content will record the details of each line of data modification very clearly , Very easy to understand .

    Disadvantages :ROW In mode , Especially when new information is added , All executed statements are logged , Will be recorded as a change to each line of record , This will generate a lot of log content .

  2. STATEMENT Pattern

    Each one will change the data SQL Statements are recorded .

    Disadvantages : Since it is the execution statement of the record , therefore , In order to make these statements in slave The end can also execute correctly , Then he must also record some relevant information during the execution of each statement , That's context information , To ensure that all statements are in slave When the end is executed, it can get and in master The same result when the client executes .

    But for now, for example step() Functions cannot be copied correctly in some versions , Used in storage last-insert-id() Function , It may make slave and master Get inconsistent on id, That is, there will be data inconsistency ,ROW In mode, there is no .

  3. MIXED Pattern

    Both models use .

Canal Real time synchronization

  1. First we need to configure the environment , stay conf/example/instance.properties Next :
 ## mysql serverId
 canal.instance.mysql.slaveId = 1234
 #position info, It needs to be modified into its own database information
 canal.instance.master.address = 127.0.0.1:3306
 canal.instance.master.journal.name =
 canal.instance.master.position =
 canal.instance.master.timestamp =
 #canal.instance.standby.address =
 #canal.instance.standby.journal.name =
 #canal.instance.standby.position =
 #canal.instance.standby.timestamp =
 #username/password, It needs to be modified into its own database information
 canal.instance.dbUsername = canal
 canal.instance.dbPassword = canal
 canal.instance.defaultDatabaseName =
 canal.instance.connectionCharset = UTF-8
 #table regex
 canal.instance.filter.regex = .\*\\\\..\*

among ,canal.instance.connectionCharset The encoding of the database corresponds to java Encoding types in , such as UTF-8,GBK,ISO-8859-1.

  1. After configuration , It's about to start
 sh bin/startup.sh
  Turn off the use of  bin/stop.sh
  1. Observation log

    In general use cat Look at canal/canal.log、example/example.log

  2. Start the client

    stay IDEA Business code ,mysql If there is incremental data in the database, pull it over , stay IDEA The console prints out

    stay pom.xml New in file :

 <dependency>
   <groupId>com.alibaba.otter</groupId>
   <artifactId>canal.client</artifactId>
   <version>1.0.12</version>
 </dependency>

Add client code :

public class Demo {
 public static void main(String[] args) {
     // Establish a connection
     CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
             "example", "", "");
     connector.connect();
     // Subscribe to
     connector.subscribe();
     connector.rollback();
     int batchSize = 1000;
     int emptyCount = 0;
     int totalEmptyCount = 100;
     while (totalEmptyCount > emptyCount) {
         Message msg = connector.getWithoutAck(batchSize);
         long id = msg.getId();
         List<CanalEntry.Entry> entries = msg.getEntries();
         if(id == -1 || entries.size() == 0){
             emptyCount++;
             System.out.println("emptyCount : " + emptyCount);
             try {
                 Thread.sleep(3000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }else{
             emptyCount = 0;
             printEntry(entries);
         }
         connector.ack(id);
     }
 }
 // batch -> entries -> rowchange - rowdata -> cols
 private static void printEntry(List<CanalEntry.Entry> entries) {
     for (CanalEntry.Entry entry : entries){
         if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                 entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
             continue;
         }
         CanalEntry.RowChange rowChange = null;
         try {
             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
         } catch (InvalidProtocolBufferException e) {
             e.printStackTrace();
         }
         CanalEntry.EventType eventType = rowChange.getEventType();
         System.out.println(entry.getHeader().getLogfileName()+" __ " +
                 entry.getHeader().getSchemaName() + " __ " + eventType);
         List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
         for(CanalEntry.RowData rowData : rowDatasList){
             for(CanalEntry.Column column: rowData.getAfterColumnsList()){
                 System.out.println(column.getName() + " - " +
                         column.getValue() + " - " +
                         column.getUpdated());
             }
         }
     }
 }
}
  1. stay mysql Chinese writing materials , The client will print the incremental data to the console .

Canal Of HA Mechanism design

In the field of big data, many frameworks will have HA Mechanism ,Canal Of HA It's divided into two parts ,Canal server and Canal client There are corresponding HA Realize :

  1. canal server: In order to reduce the amount of mysql dump Request for , Different server Upper instance Only one at a time is required to be in running, Others are in standby Status .
  2. canal client: In order to ensure order , One copy instance Only one at a time canal client To carry out get/ack/rollback operation , Otherwise, the order of client receiving cannot be guaranteed .

The whole HA The control mechanism mainly depends on ZooKeeper Several characteristics of ,ZooKeeper I won't talk about it here .

Canal Server:

  1. canal server To start something canal instance Always go first to ZooKeeper Make an attempt to start the judgment ( establish EPHEMERAL Node , The one who establishes successfully will be allowed to start ).
  2. establish ZooKeeper After node success , Corresponding canal server Start the corresponding canal instance, No successful canal instance Will be in standby Status .
  3. once ZooKeeper Find out canal server After the established node disappears , Inform the others immediately canal server Step again 1 The operation of , Choose a new one canal server Start instance.
  4. canal client Every time connect When , Will be the first to ZooKeeper Ask who is currently starting canal instance, And then connect to it , Once the connection is not available , Will try again connect.
  5. canal client And canal server In a similar way , It's also the use of ZooKeeper Occupy EPHEMERAL The way nodes are controlled .

Canal HA Configuration of , And synchronize the data to kafka in .

  1. modify conf/canal.properties Archives
 canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181
 canal.serverMode = kafka
 canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092
  1. To configure conf/example/example.instance
  canal.instance.mysql.slaveId = 790 / Two canal server Of slaveID only 
  canal.mq.topic = canal_log // Specifies that data be sent to kafka Of topic

Summary of data synchronization scheme

It's over Canal Tools , Now I will give you a brief summary of the current common data collection tools , No architecture knowledge involved , It's just a summary , Let's make an impression .

Common data collection tools are :DataX、Flume、Canal、Sqoop、LogStash etc. .

DataX ( Processing offline data )

DataX Alibaba is an open source offline synchronization tool for heterogeneous data sources , Offline synchronization of heterogeneous data sources refers to the synchronization of data from the source to the destination , However, there are many types of data sources between end and end , In the absence of DataX Before , End to end links will form a complex network structure , Very fragmented, unable to abstract synchronization core logic .

In order to solve the problem of heterogeneous data source synchronization ,DataX The complex mesh synchronous link is transformed into a star data link ,DataX As an intermediate carrier, it is responsible for connecting various data sources .

therefore , When you need to access a new data source , Just connect this source to DataX, You can synchronize with existing data sources seamlessly .

DataX As an offline data synchronization framework , Adopt Framework+plugin Architecture building . Abstract the data source read and write into Reader/Writer Plug in , Into the whole synchronization framework .

  1. Reader: It is a data acquisition module , Responsible for collecting data from sources , Transmit data to Framework.
  2. Writer: It's a data write module , To be responsible for continuously to Framework Take the information , And write the data to the destination .
  3. Framework: It's used to connect Reader and Writer, As a data transmission channel for both , And deal with buffers 、 Concurrent 、 Data conversion and so on .

DataX The core architecture is shown in the figure below :

Core module introduction :

  1. DataX Complete a single data synchronization job , We call it Job,DataX Received a Job After that , Will start a program to complete the job synchronization process .
  2. DataX Job After starting , According to different source segmentation strategies , Will Job Cut into small pieces Task( Subtask ), For concurrent execution .
  3. Cut into many Task After that ,DataX Job Will call Scheduler Module , According to the amount of concurrent data configured , Split into Task Recombine , Assemble into TaskGroup( Task force ). Every one of them TaskGroup Responsible for the completion of a certain amount of concurrent execution of all allocated Task, The default number of concurrent tasks in a single task group is 5.
  4. Every one of them Task All by TaskGroup Responsible for starting ,Task After starting , It will be fixed to start Reader->Channel->Writer To complete task synchronization .
  5. DataX After the job is finished ,Job Monitor and wait for multiple TaskGroup Module task completed , Wait for all TaskGroup When the task is completed Job Quit successfully . Otherwise , Abnormal exit .

Flume( Processing real-time data )

Flume The main application scenario is to synchronize log data , It mainly consists of three components :Source、Channel、Sink.

Flume The biggest advantage is that the official website provides a wealth of Source、Channel、Sink, According to different business needs , We can query the relevant configuration on the official website . in addition ,Flume Interfaces for customizing these components are also provided .

Logstash( Processing offline data )

Logstash It's a pipeline with real-time data transmission capability , Responsible for transmitting data and information from the input end of the pipeline to the output end of the pipeline ; At the same time, this pipe allows you to add a filter in the middle according to your own needs ,Logstash Many powerful filters are provided to meet various application scenarios .

Logstash By JRuby Write , Using a simple message based architecture , stay JVM On the execution of . The data flow in the pipeline is called event, It is divided into inputs Stage 、filters Stage 、outputs Stage .

Sqoop( Processing offline data )

Sqoop yes Hadoop A tool for transferring data between database and relational database , It is used to extract data from relational databases such as MySQL To Hadoop Of HDFS From Hadoop File system exports data to relational database .Sqoop The bottom floor is still MapReducer, Pay attention to the data tilt when using .

Summary

Lao Liu's article mainly talks about Canal The core knowledge points of the tool and the comparison of its data collection tools , Among them, the data acquisition tool only talks about the concept and application , The purpose is to give you an impression . Lao Liu dares to make a promise that reading this article is basically an introduction , The rest is practice .

All right. , Sync mysql Tools for incremental data Canal That's all , Although the current level may not be as good as you guys , But Liu will try to be better , Let you learn by yourself and never ask for help !

If there are related questions , Contact the public number : Hard working old Liu . The article all saw this , Like, focus on the support wave !

版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210122161608081l.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云