不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • Langchain
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • MySQL

  • Redis

  • Elasticsearch

    • 01.ES安装
    • 02.ES介绍
    • 03.倒排索引 ✅
    • 04.ES原理 ✅
    • 05.ES集群原理 ✅
    • 06.ES集群部署
    • 07.ES优化
    • 21.ES基本使用
    • 22.中文分词检索
    • 23.python使用ES
    • 24.ES复杂类型
    • 100.制作一些数据
    • 101.ES同步
      • 01.MySQL数据同步到ES
        • 1.1 同步类型
        • 1.2 数据同步实现方式
      • 02.canal实现方案
        • 2.1 canal是什么
        • 2.2 canel原理
      • 03.搭建canal
        • 3.1 安装MySQL与基本配置
        • 3.1.1 安装MySQL
        • 3.1.2 创建MySQL用户
        • 3.1.3 配置文件my.cnf
        • 3.1.4 确定执行状态
        • 3.2 安装canal
        • 3.2.1 下载并解压 canal
        • 3.2.2 配置修改
        • 3.2.3 准备启动
        • 3.2.4 查看日志
        • 3.2.5 关闭
      • 04.canel客户端
        • 4.1 canal-python 简介
        • 4.2 应用场景
        • 4.3 工作原理
        • 4.4 工作流程
        • 4.5 快速启动
      • 05.总结
  • Kafka

  • Etcd

  • MongoDB

  • TiDB

  • RabbitMQ

  • 数据库
  • Elasticsearch
xiaonaiqiang
2021-03-29
目录

101.ES同步

# 01.MySQL数据同步到ES

# 1.1 同步类型

  • MySQL数据同步到ES中分为两种,分别是全量同步和增量同步。
  • 全量同步表示第一次建立好ES索引之后,将MySQL中所有数据一次性导入到ES中。
  • 增量同步表示MySQL中产生新的数据,这些新的数据包括三种情况
    • 新插入MySQL中的数据
    • 更新老的数据
    • 删除的数据
    • 这些数据的变动与新增都要同步到ES中。

# 1.2 数据同步实现方式

业界有一些开源方案,开源中间件来实现

  • 方法1:canal (opens new window)

    • 基于MySQL的binlog日志订阅:binlog日志是MySQL用来记录数据实时的变化。
    • 这里主要的是binlog同步组件,目前实现的有国内的阿里巴巴开发的canal
  • 方法2:go-MySQL-elasticsearch

    • 就是这样一个项目,它可以从 MySQL 的数据表中读取指定数据表的数据,发送到 ElasticSearch 之中。
    • 它会使用 MySQLdump 命令处理现有存量数据,并借助 binlog 的方式跟踪增量数据
    • 从而保证 Elasticsearch 的数据和 MySQL 数据库中的数据保持同步。

# 02.canal实现方案

# 2.1 canal是什么

  • 我们都知道一个系统最重要的是数据,数据是保存在数据库里。
  • 但是很多时候不单止要保存在数据库中,还要同步保存到Elastic Search、HBase、Redis等等
  • 阿里开源的框架Canal,他可以很方便地同步数据库的增量数据到其他的存储应用

# 2.2 canel原理

  • canal的工作原理就是把自己伪装成MySQL slave
    • 1) 模拟MySQL slave的交互协议向MySQL Mater发送 dump协议
    • 2) MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal
    • 3) 然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等

# 03.搭建canal

  • canal配置使用 (opens new window)

# 3.1 安装MySQL与基本配置

# 3.1.1 安装MySQL

  • 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  • 我的Linux服务器安装的MySQL服务器是5.7版本。

  • MySQL的安装这里就不演示了,比较简单,网上也有很多教程。

# 3.1.2 创建MySQL用户

# 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';
# 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';
FLUSH PRIVILEGES;
1
2
3
4
5

# 3.1.3 配置文件my.cnf

[MySQLd]
log-bin=MySQL-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=129 #配置MySQL replaction需要定义,不能和canal的slaveId重复
1
2
3
4

# 3.1.4 确定执行状态

  • 重启MySQL
  • 确定打开了打开binlog模式
MySQL> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin     | ON   |
+---------------+-------+
1
2
3
4
5
6
MySQL> show binary logs;     # 查看binlog日志文件列表
MySQL> show master status;    # 查看当前正在写入的binlog文件
1
2

# 3.2 安装canal

# 3.2.1 下载并解压 canal

  • 进入官网

去官网下载页面进行下载:https://github.com/alibaba/canal/releases (opens new window)

  • 下载并解压
[root@localhost ~]# cd /usr/local/src/
[root@localhost src]# wget https://github.com/alibaba/canal/releases/download/canal-1.0.19/canal.deployer-1.0.19.tar.gz

[root@localhost src]# mkdir /usr/local/canal
[root@localhost src]# tar zxvf canal.deployer-1.0.19.tar.gz -C /usr/local/canal/
1
2
3
4
5
  • 可以看到如下结构
[root@localhost src]# cd ..
[root@localhost local]# cd /usr/local/canal/
[root@localhost canal]# ls
bin  conf  lib logs
1
2
3
4

# 3.2.2 配置修改

  • 修改conf/example/instance.properties
[root@localhost canal]# vim conf/example/instance.properties
# 第一:MySQL serverId,改成唯一的
canal.instance.MySQL.slaveId = 129

# 第二:position info,需要改成自己的数据库信息
canal.instance.master.address=127.0.0.1:3306         # 数据库地址
canal.instance.master.journal.name=MySQL-bin.000001    # binlog日志名称
canal.instance.master.position=154                # MySQL主库链接时起始的binlog偏移量
canal.instance.master.timestamp=                 # MySQL主库链接时起始的binlog的时间戳

# 第三:username/password,需要改成自己的数据库信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=test
canal.instance.connectionCharset=UTF-8

# 第四:table regex
canal.instance.filter.regex=.*\\..*
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  • 修改 conf/canal.properties 文件
    • 将 canal.ip 改成canal所在机器的ip地址,避免无谓的问题
canal.id= 128
canal.ip=192.168.134.128
canal.port= 11111
1
2
3

# 3.2.3 准备启动

[root@localhost canal]# ./bin/startup.sh
1
  • linux启动完成后,会在bin目录下生成canal.pid,stop.sh会读取canal.pid进行进程关闭;
  • startup.sh默认读取系统环境变量中的which java获得JAVA执行路径,需要设置PATH=$JAVA_HOME/bin环境变量。

# 3.2.4 查看日志

[root@localhost canal]# vim logs/canal/canal.log
[root@localhost canal]# vim logs/example/example.log
1
2

# 3.2.5 关闭

[root@localhost canal]# ./bin/stop.sh 
localhost.localdomain: stopping canal 6237 ... 
1
2

# 04.canel客户端

  • canal-python客户端 (opens new window)

# 4.1 canal-python 简介

  • canal-python 是阿里巴巴开源项目 Canal (opens new window)是阿里巴巴MySQL数据库binlog的增量订阅&消费组件 的 python 客户端。

  • 为 python 开发者提供一个更友好的使用 Canal 的方式。

  • Canal 是MySQL数据库binlog的增量订阅&消费组件。

  • 基于日志增量订阅&消费支持的业务:

    • 1)数据库镜像
    • 2)数据库实时备份
    • 3)多级索引 (卖家和买家各自分库索引)
    • 4)search build
    • 5)业务cache刷新
    • 6)价格变化等重要业务消息

# 4.2 应用场景

  • 1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

  • 2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等

  • 3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到Redis

  • 4.数据库异地备份、数据同步

  • 5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。

  • 6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

# 4.3 工作原理

  • canal-python 是 Canal 的 python 客户端
  • 它与 Canal 是采用的Socket来进行通信的,传输协议是TCP
  • 交互协议采用的是 Google Protocol Buffer 3.0

# 4.4 工作流程

  • 1.Canal连接到MySQL数据库,模拟slave

  • 2.canal-python 与 Canal 建立连接

  • 2.数据库发生变更写入到binlog

  • 5.Canal向数据库发送dump请求,获取binlog并解析

  • 4.canal-python 向 Canal 请求数据库变更

  • 4.Canal 发送解析后的数据给canal-python

  • 5.canal-python收到数据,消费成功,发送回执。(可选)

  • 6.Canal记录消费位置。

# 4.5 快速启动

  • 安装Canal

    • Canal 的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart
  • 环境要求

    • python >= 3
  • 构建canal python客户端

pip install canal-python
1
  • 建立与Canal的连接
import time

from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2

client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')

while True:
    message = client.get(100)
    entries = message['entries']
    for entry in entries:
        entry_type = entry.entryType
        if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
            continue
        row_change = EntryProtocol_pb2.RowChange()
        row_change.MergeFromString(entry.storeValue)
        event_type = row_change.eventType
        header = entry.header
        database = header.schemaName
        table = header.tableName
        event_type = header.eventType
        for row in row_change.rowDatas:
            format_data = dict()
            if event_type == EntryProtocol_pb2.EventType.DELETE:
                for column in row.beforeColumns:
                    format_data = {
                        column.name: column.value
                    }
            elif event_type == EntryProtocol_pb2.EventType.INSERT:
                for column in row.afterColumns:
                    format_data = {
                        column.name: column.value
                    }
            else:
                format_data['before'] = format_data['after'] = dict()
                for column in row.beforeColumns:
                    format_data['before'][column.name] = column.value
                for column in row.afterColumns:
                    format_data['after'][column.name] = column.value
            data = dict(
                db=database,
                table=table,
                event_type=event_type,
                data=format_data,
            )
            print(data)
    time.sleep(1)

client.disconnect()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 05.总结

  • canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。

  • 实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。

  • 通过上面的学习之后,我们应该都明白canal是什么,它的原理,还有用法

  • 实际上这仅仅只是入门,因为实际项目中我们不是这样玩的...

  • 实际项目我们是配置MQ模式,配合RocketMQ或者Kafka

  • canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理

  • Canal的部署也是支持集群的,需要配合ZooKeeper进行集群管理

  • Canal还有一个简单的Web管理界面

上次更新: 2024/10/15 16:27:13
100.制作一些数据
01.Kafka安装

← 100.制作一些数据 01.Kafka安装→

最近更新
01
05.快递Agent智能体
06-04
02
200.AI Agent核心概念
06-04
03
105.Agent智能体梳理
06-04
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式