网站收录查询主要由哪几个网站,邢台做网站名列前茅,怎么编辑网站内容,接做网站单子CanalKafka实现MySQL与Redis数据同步#xff08;二#xff09;
创建MQ消费者进行同步
在application.yml配置文件加上kafka的配置信息#xff1a;
spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group…CanalKafka实现MySQL与Redis数据同步二
创建MQ消费者进行同步
在application.yml配置文件加上kafka的配置信息
spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group1#序列化反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringDeserializervalue-serializer: org.apache.kafka.common.serialization.StringDeserializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288根据上面Kafka消费命令那里我们知道了json数据的结构可以创建一个CanalBean对象进行接收
public class CanalBean {//数据private ListTbCommodityInfo data;//数据库名称private String database;private long es;//递增从1开始private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句旧数据private String old;//主键名称private ListString pkNames;//sql语句private String sql;private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法
}
public class MysqlType {private String id;private String commodity_name;private String commodity_price;private String number;private String description;//getter、setter方法
}
public class SqlType {private int id;private int commodity_name;private int commodity_price;private int number;private int description;
}最后就可以创建一个消费者CanalConsumer进行消费
Component
public class CanalConsumer {//日志记录private static Logger log LoggerFactory.getLogger(CanalConsumer.class);//redis操作工具类Resourceprivate RedisClient redisClient;//监听的队列名称为canaltopicKafkaListener(topics canaltopic)public void receive(ConsumerRecord?, ? consumer) {String value (String) consumer.value();log.info(topic名称:{},key:{},分区位置:{},下标:{},value:{}, consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为javaBeanCanalBean canalBean JSONObject.parseObject(value, CanalBean.class);//获取是否是DDL语句boolean isDdl canalBean.getIsDdl();//获取类型String type canalBean.getType();//不是DDL语句if (!isDdl) {ListTbCommodityInfo tbCommodityInfos canalBean.getData();//过期时间long TIME_OUT 600L;if (INSERT.equals(type)) {//新增语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id tbCommodityInfo.getId();//新增到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else if (UPDATE.equals(type)) {//更新语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id tbCommodityInfo.getId();//更新到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else {//删除语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id tbCommodityInfo.getId();//从redis中删除redisClient.deleteKey(id);}}}}
}测试MySQL与Redis同步
mysql对应的表结构如下
CREATE TABLE tb_commodity_info (id varchar(32) NOT NULL,commodity_name varchar(512) DEFAULT NULL COMMENT 商品名称,commodity_price varchar(36) DEFAULT 0 COMMENT 商品价格,number int(10) DEFAULT 0 COMMENT 商品数量,description varchar(2048) DEFAULT COMMENT 商品描述,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT商品信息表;首先在MySQL创建表。然后启动项目接着新增一条数据
INSERT INTO canaldb.tb_commodity_info (id, commodity_name, commodity_price, number, description) VALUES (3e71a81fd80711eaaed600163e046cc3, 叉包, 3.99, 3, 大叉包老喜欢);tb_commodity_info表查到新增的数据 Redis也查到了对应的数据证明同步成功 如果更新呢试一下Update语句
UPDATE canaldb.tb_commodity_info SET commodity_name青菜包,description便宜的青菜包 WHERE id3e71a81fd80711eaaed600163e046cc3;没有问题
总结
canal的缺点
canal只能同步增量数据。不是实时同步是准实时同步。存在一些bug不过社区活跃度较高对于提出的bug能及时修复。MQ顺序性问题。 网的回答大家参考一下 尽管有一些缺点毕竟没有一样技术(产品)是完美的合适最重要。