canal+Kafka实现mysql与redis数据同步

前言
上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用 。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化 。如果这时候数据库数据发生变更操作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题 。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现MySQL与redis之间的数据同步 。
架构设计
【canal+Kafka实现mysql与redis数据同步】canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件 。上一篇文章 canal入门 中简单介绍了使用方式,即tcp模式;其实canal也是支持直接发送到MQ中,比如:Kafka、RocketMQ、RabbitMQ 。本文采用Kafka讲解,实现mysql与redis之间的数据同步 。
通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis 。

canal+Kafka实现mysql与redis数据同步

文章插图
 
Kafka&Zookeeper搭建
首先在官网下载Kafka:
canal+Kafka实现mysql与redis数据同步

文章插图
 
下载后解压文件夹,可以看到以下几个文件:
canal+Kafka实现mysql与redis数据同步

文章插图
 
Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现 。
canal+Kafka实现mysql与redis数据同步

文章插图
 
通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:
# 命令常见一个canaltopic 队列kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopicCanal搭建
canal搭建具体可以参考上文,这里只讲解具体的参数配置:
找到/conf目录下的canal.properties配置文件:
# tcp, kafka, RocketMQ 这里选择kafka模式canal.serverMode = kafka# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况canal.instance.parser.parallelThreadSize = 16# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口canal.mq.servers = 127.0.0.1:9092# 配置instance,在conf目录下要有example同名的目录,可以配置多个canal.destinations = example然后配置instance,找到
/conf/example/instance.properties配置文件:
## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0# position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canal.mq.topic=canaltopic#单队列模式的分区下标canal.mq.partition=0经过上述配置后,就可以启动canal了 。
测试
环境搭建完成后,就可以编写代码进行测试 。
1、引入pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>2、封装Redis工具类
在Application.yml文件增加以下配置:
spring:redis:host: 127.0.0.1port: 6379database: 0password: 123456封装一个操作Redis的工具类:
@Componentpublic class RedisClient {/*** 获取redis模版*/@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 设置redis的key-value*/public void setString(String key, String value) {setString(key, value, null);}/*** 设置redis的key-value,带过期时间*/public void setString(String key, String value, Long timeOut) {stringRedisTemplate.opsForValue().set(key, value);if (timeOut != null) {stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);}}/*** 获取redis中key对应的值*/public String getString(String key) {return stringRedisTemplate.opsForValue().get(key);}/*** 删除redis中key对应的值*/public Boolean deleteKey(String key) {return stringRedisTemplate.delete(key);}}3、创建MQ消费者进行同步
在application.yml配置文件加上kafka的配置信息:spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group1#序列化反序列化key-deserializer: org.Apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringDeserializervalue-serializer: org.apache.kafka.common.serialization.StringDeserializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288


推荐阅读