canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 。
这篇文章,我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到 ES 。
文章插图
1 集群模式
文章插图
图中 server 对应一个 canal 运行实例 ,对应一个 JVM。
server 中包含 1..n 个 instance ,我们可以将 instance 理解为配置任务 。
instance 包含如下模块 :
- eventParser数据源接入,模拟 slave 协议和 master 进行交互,协议解析
- eventSinkParser 和 Store 链接器,进行数据过滤,加工,分发的工作
- eventStore数据存储
- metaManager增量订阅 & 消费信息管理器
实战中我们经常会使用 MQ 模式。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可 。
顺序消费:2 MySQL配置1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费 。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求 。
文章插图
[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步 。2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant。
CREATE USER canal IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
3、创建数据库商品表 t_product 。CREATE TABLE `t_product` ( `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT, `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL, `price` DECIMAL ( 10, 2 ) NOT NULL, `status` TINYINT ( 4 ) NOT NULL, `create_time` datetime NOT NULL, `update_time` datetime NOT NULL,PRIMARY KEY ( `id` ) ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin
3 Elasticsearch配置使用 Kibana 创建商品索引 。PUT /t_product{"settings": {"number_of_shards": 2,"number_of_replicas": 1},"mAppings": {"properties": {"id": {"type":"keyword"},"name": {"type":"text"},"price": {"type":"double"},"status": {"type":"integer"},"createTime": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},"updateTime": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}}}}
执行完成,如图所示 :文章插图
4 RocketMQ 配置创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题 。
文章插图
文章插图
5 canal 配置我们选取 canal 版本 1.1.6 ,进入 conf 目录 。
1、配置 canal.properties
#集群模式 zk地址canal.zkServers = localhost:2181#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQcanal.serverMode = rocketMQ#instance 列表canal.destinations = product-syn#conf root dircanal.conf.dir = ../conf#全局的spring配置方式的组件文件 生产环境,集群化部署canal.instance.global.spring.xml = classpath:spring/default-instance.xml######以下部分是默认值 展示出来 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)canal.mq.canalBatchSize = 50# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时canal.mq.canalGetTimeout = 100# 是否为 flat json格式对象canal.mq.flatMessage = true
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Golang 中的 IO 包详解:常用的可导出函数详解
- 秋天野钓青鱼,4个爆护技巧详解,让你大鱼连竿上
- 详解API接口如何安全的传输数据
- 容器技术架构、网络和生态详解
- TVB前当家小生自曝不拍戏原因!51岁未婚未育,直言难与人同步
- 什么是小跑铅钓法?小跑铅钓法详解
- Redis干货 | 五种常用类型之Hash哈希存储类型详解
- MySQL算术运算符使用详解
- MySQL逻辑运算符使用详解
- 水草造景教程 水草造景教程详解