本篇主要探讨MySQL数据同步的各类常见技术方案及优劣势对比分析,从而更加深层次的理解方案,进而在后续的实际业务中,更好的选择方案。
1 CDC概念
CDC即Change Data Capture
,变更数据捕获,即当数据发生变更时,能够实时或准实时的捕获到数据的变化,以MySQL为例,产生数据变更的操作有insert
,update
,delete
。CDC技术就时在数据变更时,能够以安全、可靠的方式同步给其他服务、存储,如mongodb、es、kafka、redis、clickhouse等。
2 CDC原理分类
目前一些常用的组件有alibaba canal ,apache flink ,go-mysql-transfer 等。CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:
2.1 基于查询的 CDC
离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据; 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; 不保障实时性,基于离线调度存在天然的延迟。
2.2 基于日志的 CDC
实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源; 保障数据一致性,因为 binlog 文件包含了所有历史变更明细; 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。
3 开源方案对比
flink cdc Debezium Canal Sqoop Kettle Oracle Goldengate Go-mysql-transfer
CDC机制 日志 日志 日志 查询 查询 日志 日志 增量同步 ✅ ✅ ✅ ✅ ❌ ✅ ✅ 全量同步 ✅ ✅ ❌ ✅ ✅ ✅ ✅ 断点续传 ✅ ✅ ✅ ❌ ❌ ✅ ✅ 全量 + 增量 ✅ ✅ ❌ ✅ ❌ ✅ ✅ 架构 分布式 单机 单机 分布式 分布式 分布式 单机 Transformation ⭐️⭐️⭐️⭐️⭐️ ⭐️⭐️ ⭐️⭐️ ⭐️⭐️ ⭐️ ⭐️ ⭐️⭐️⭐️⭐️ 生态 ⭐️⭐️⭐️⭐️⭐️ ⭐️⭐️⭐️ ⭐️⭐️⭐️ ⭐️⭐️ ⭐️⭐️ ⭐️⭐️⭐️ ⭐️⭐️
如上图所示,需要根据实际业务场景,决定使用哪一种开源方案。
4 使用场景
cdc,顾名思义,就是数据变更捕获,其本质是实时获取MySQL数据变更(增删改),进而同步其他服务或者业务方。因此其使用场景主要分为:
数据分发:将一个数据源的数据分发给多个下游业务系统,常用于业务解耦、微服务系统。 数据采集:面向数据仓库、数据湖的ETL数据集成,消除数据孤岛,便于后续的分析。 数据同步:常用于数据备份、容灾等。
5 MySQL配置
5.1 开启MySQL的binlog
[ mysqld]
default-storage-engine= INNODB
server-id = 100 (` 唯一` )
port = 3306
log-bin= mysql-bin ( ` 开启` )
binlog_format = ROW (` 注意要设置为行模式` )
开启之后,在MySQL的数据目录(/usr/local/mysql-8.0.32-macos13-arm64/data
),就会生成相应的binlog文件
-rw-r----- 1 _mysql _mysql 1867 6 12 00:03 mysql-bin.000001
-rw-r----- 1 _mysql _mysql 5740 6 18 20 :55 mysql-bin.000002
-rw-r----- 1 _mysql _mysql 38 6 12 00:03 mysql-bin.index
5.2 创建canal同步账户及权限设置
mysql> CREATE USER canal IDENTIFIED BY 'canal' ;
mysql> GRANT SELECT, SHOW VIEW, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal' @'%' ;
mysql> FLUSH PRIVILEGES;
6 Canal配置
6.1 canal同步kafka原理
原理等同于MySQL的主从复制,具体流程:
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议 MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) canal 解析 binary log 对象(原始为 byte 流)
6.2 canal安装与配置
具体配置请参考文章 https://www.cnblogs.com/Clera-tea/p/16517424.html
6.2.1 配置文件
/canal/conf/canal.properties
6.2.2 同步kafka配置
canal.serverMode = kafka
kafka.bootstrap.servers = 127.0 .0.1:9092 ( 本机kafka服务)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
6.2.3 binlog过滤设置
canal.instance.filter.druid.ddl = false(注意这里true 改成 false)
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
6.2.4 同步destinations设置
canal.destinations = example,mytopic(多个逗号分隔)
6.2.5 每个topic都有各自的实例配置
路径/conf/topicname/instance.properties 设置监听mysql地址
canal.instance.master.address = 127.0 .0.1:3306
配置mysql账户
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8
配置canal同步到kafka topic信息
canal.mq.topic = mytopic
6.2.6 kafka数据接收
1 mysql
2 zkServer start
3 kafka-server-start /opt/homebrew/etc/kafka/server.properties
4 canal/bin/startup.sh
kafka 消费者收到的消息如下
{
"data" :[
{
"id" : "22" ,
"url" : "1" ,
"source" : "d" ,
"status" : "1" ,
"created_at" : "2023-06-29 00:10:31" ,
"updated_at" : "2023-06-29 00:10:31"
}
] ,
"database" : "finance" ,
"es" :1687968631000,
"id" :2,
"isDdl" :false,
"mysqlType" :{
"id" : "int unsigned" ,
"url" : "varchar(2048)" ,
"source" : "varchar(32)" ,
"status" : "tinyint" ,
"created_at" : "datetime" ,
"updated_at" : "datetime"
} ,
"old" :null,
"pkNames" :[
"id"
] ,
"sql" : "" ,
"sqlType" :{
"id" :4,
"url" :12,
"source" :12,
"status" :-6,
"created_at" :93,
"updated_at" :93
} ,
"table" : "f_collect" ,
"ts" :1687968631537,
"type" : "INSERT"
}
{
"data" :[
{
"id" : "22" ,
"url" : "1" ,
"source" : "d" ,
"status" : "100" ,
"created_at" : "2023-06-29 00:10:31" ,
"updated_at" : "2023-06-29 00:31:39"
}
] ,
"database" : "finance" ,
"es" :1687969899000,
"id" :3,
"isDdl" :false,
"mysqlType" :{
"id" : "int unsigned" ,
"url" : "varchar(2048)" ,
"source" : "varchar(32)" ,
"status" : "tinyint" ,
"created_at" : "datetime" ,
"updated_at" : "datetime"
} ,
"old" :[
{
"status" : "1" ,
"updated_at" : "2023-06-29 00:10:31"
}
] ,
"pkNames" :[
"id"
] ,
"sql" : "" ,
"sqlType" :{
"id" :4,
"url" :12,
"source" :12,
"status" :-6,
"created_at" :93,
"updated_at" :93
} ,
"table" : "f_collect" ,
"ts" :1687969899293,
"type" : "UPDATE"
}
{
"data" :[
{
"id" : "22" ,
"url" : "1" ,
"source" : "d" ,
"status" : "100" ,
"created_at" : "2023-06-29 00:10:31" ,
"updated_at" : "2023-06-29 00:31:39"
}
] ,
"database" : "finance" ,
"es" :1687969946000,
"id" :4,
"isDdl" :false,
"mysqlType" :{
"id" : "int unsigned" ,
"url" : "varchar(2048)" ,
"source" : "varchar(32)" ,
"status" : "tinyint" ,
"created_at" : "datetime" ,
"updated_at" : "datetime"
} ,
"old" :null,
"pkNames" :[
"id"
] ,
"sql" : "" ,
"sqlType" :{
"id" :4,
"url" :12,
"source" :12,
"status" :-6,
"created_at" :93,
"updated_at" :93
} ,
"table" : "f_collect" ,
"ts" :1687969946443,
"type" : "DELETE"
}
7 go-mysql-transfer配置
7.1 基本说明
项目github地址: go-mysql-transfer
简单,不依赖其它组件,一键部署 集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,无需编写客户端,开箱即用 内置丰富的数据解析、消息生成规则、模板语法 支持Lua脚本扩展,可处理复杂逻辑 集成Prometheus客户端,支持监控告警 集成Web Admin监控页面 支持高可用集群部署 数据同步失败重试 支持全量数据初始化
7.2 原理
将自己伪装为MySQL的Slave
监听binlog
,获取binlog的变更数据 根据规则或者lua脚本
解析数据,生成指定格式的消息 将生成的消息批量发送给接收端
7.3 安装
1 、依赖Golang 1.14 及以上版本
2 、设置' GO111MODULE=on '
3 、拉取源码 ' git clone https://github.com/wj596/go-mysql-transfer.git '
4 、进入目录,执行 ' go build ' 编译
7.4 全量数据同步
./go-mysql-transfer -stock
7.5 配置文件app.yaml
都能看懂,不做详细说明,主要配置项
1 . mysql
2 . target (kafka)
3 . kafka配置
4 . rule
4.1 数据库,表,字段
4.2 lua_file_path: lua/sync.lua 可以只配置基本的数据格式,也可以配置lua脚本来调整数据格式
4.3 kafka topic
addr: 127.0 .0.1:3306
user:
pass:
charset : utf8
slave_id: 1001
flavor: mysql
enable_web_admin: true
web_admin_port: 8060
target: kafka
kafka_addrs: 127.0 .0.1:9092
rule:
-
schema: test
table: score
column_underscore_to_camel: false
include_columns: ID,name,age,sex
lua_file_path: lua/sync.lua
value_encoder: json
redis_structure: string
kafka_topic: test
7.6 项目启动
1 . 启动zk(zkServer.sh)
2 . 启动kafka (kafka-server-start.sh server.properties)
3 . 启动go-mysql-transfer ( ./go-mysql-transfer)
4 . 启动kafka消费者(kafka-console-consumer --bootstrap-server 127.0 .0.1:9092 --topic test)
5 . 编写简单的lua脚本,实现数据同步
6 . 验证数据同步
go-mysql-transfer/lua/sync.lua脚本内容
local json = require( "json" ) -- 加载json模块
local ops = require( "mqOps" ) --加载mq操作模块
local os = require( "os" ) --加载os模块
local row = ops.rawRow( ) --当前数据库的一行数据,
local action = ops.rawAction( ) --当前数据库事件,包括:insert、updare、delete
local id = row[ "id" ] --获取ID列的值
local name = row[ "name" ]
local age = row[ "age" ]
local sex = row[ "sex" ]
local result = { }
local data = { }
result[ "timestamp" ] = os.time( )
result[ "action" ] = action
data[ 'id' ] = id
data[ 'name' ] = name
data[ 'age' ] = age
data[ 'sex' ] = sex
result[ "object" ] = data
local val = json.encode( result) -- 将result转为json
ops.SEND( "test" , val) -- 发送消息,参数1:topic( string类型) ,参数2:消息内容
启动go-mysql-transfer mysql更新数据
kafka收到的消息
常见问题汇总
The Cluster ID i0yMUA_eRHuBS60eM1ph9w doesn’t match stored clusterId Some(aH https://blog.csdn.net/m0_59252007/article/details/119533700
参考文档
1 https://www.kancloud.cn/wj596/go-mysql-transfer/2116628 2 https://www.cnblogs.com/Clera-tea/p/16517424.html