Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis
发表于:2024-10-01 作者:千家信息网编辑
千家信息网最后更新 2024年10月01日,今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一
千家信息网最后更新 2024年10月01日Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis
今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
一、Canal使用RocketMQ同步MySQL
Canal结合RocketMQ同步MySQL
二、 同步数据到Redis
2.1 安装Redis
略
2.2 Redis配置
略
2.3 SpringBoot配置
2.3.1 引入依赖
com.alibaba.otter canal.client 1.1.4 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2 javax.persistence persistence-api
2.3.2 通用代码
SQLType.java
import lombok.AccessLevel;import lombok.NoArgsConstructor;/** * Canal监听SQL类型 * * @author Yu * @date 2019/09/08 00:18 **/@NoArgsConstructor(access = AccessLevel.PRIVATE)public class SQLType { /**插入*/ public static final String INSERT = "INSERT"; /**更新*/ public static final String UPDATE = "UPDATE"; /**删除*/ public static final String DELETE = "DELETE";}
User.java
import lombok.Data;import javax.persistence.Id;import java.io.Serializable;/** * UserPo对象 * * @author Yu * @date 2019/09/08 14:13 **/@Datapublic class User implements Serializable { private static final long serialVersionUID = -6845801275112259322L; @Id private Integer uid; private String username; private String password; private String sex;}
CanalSynService.java
import com.alibaba.otter.canal.protocol.FlatMessage;import java.util.Collection;/** * Canal同步服务 * * @author Yu * @date 2019/09/08 00:00 **/public interface CanalSynService{ /** * 处理数据 * * @param flatMessage CanalMQ数据 */ void process(FlatMessage flatMessage); /** * DDL语句处理 * * @param flatMessage CanalMQ数据 */ void ddl(FlatMessage flatMessage); /** * 插入 * * @param list 新增数据 */ void insert(Collection list); /** * 更新 * * @param list 更新数据 */ void update(Collection list); /** * 删除 * * @param list 删除数据 */ void delete(Collection list);}
AbstractCanalMQ2RedisService.java
import com.alibaba.otter.canal.protocol.FlatMessage;import com.google.common.collect.Sets;import com.taco.springcloud.canal.constant.SQLType;import com.taco.springcloud.core.component.ApplicationContextHolder;import com.taco.springcloud.core.exception.BizException;import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum;import com.taco.springcloud.core.utils.JsonUtil;import com.taco.springcloud.redis.utils.RedisUtils;import lombok.extern.slf4j.Slf4j;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.util.ReflectionUtils;import javax.annotation.Resource;import javax.persistence.Id;import java.lang.reflect.Field;import java.lang.reflect.ParameterizedType;import java.util.*;/** * 抽象CanalMQ通用处理服务 * * @author Yu * @date 2019/09/08 00:05 **/@Slf4jpublic abstract class AbstractCanalMQ2RedisServiceimplements CanalSynService { @Resource private RedisTemplate redisTemplate; @Resource private RedisUtils redisUtils; private Class cache; /** * 获取Model名称 * * @return Model名称 */ protected abstract String getModelName(); @Override public void process(FlatMessage flatMessage) { if(flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set data = getData(flatMessage); if(SQLType.INSERT.equals(flatMessage.getType())) { insert(data); } if(SQLType.UPDATE.equals(flatMessage.getType())) { update(data); } if(SQLType.DELETE.equals(flatMessage.getType())) { delete(data); } } @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL需要同步,删库清空,更新字段处理 } @Override public void insert(Collection list) { insertOrUpdate(list); } @Override public void update(Collection list) { insertOrUpdate(list); } private void insertOrUpdate(Collection list) { redisTemplate.executePipelined( (RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); RedisSerializer keySerializer = redisTemplate.getKeySerializer(); RedisSerializer valueSerializer = redisTemplate.getValueSerializer(); redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data)); } return null; }); } @Override public void delete(Collection list) { Set keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } //Set keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet()); redisUtils.delAll(keys); } /** * 封装redis的key * * @param t 原对象 * @return key */ protected String getWrapRedisKey(T t) { return new StringBuilder() .append(ApplicationContextHolder.getApplicationName()) .append(":") .append(getModelName()) .append(":") .append(getIdValue(t)) .toString(); } /** * 获取类泛型 * * @return 泛型Class */ protected Class getTypeArguement() { if(cache == null) { cache = (Class ) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return cache; } /** * 获取Object标有@Id注解的字段值 * * @param t 对象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 获取Class标有@Id注解的字段名称 * * @return id字段名称 */ protected Field getIdField() { Class clz = getTypeArguement(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { Id annotation = field.getAnnotation(Id.class); if (annotation != null) { return field; } } log.error("PO类未设置@Id注解"); throw new BizException(BaseApiCodeEnum.FAIL); } /** * 转换Canal的FlatMessage中data成泛型对象 * * @param flatMessage Canal发送MQ信息 * @return 泛型对象集合 */ protected Set getData(FlatMessage flatMessage) { List
TestUsersConsumer.java
import com.alibaba.otter.canal.protocol.FlatMessage;import com.taco.springcloud.canal.model.User;import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;@Slf4j@Service@RocketMQMessageListener(topic = "test_users", consumerGroup = "users")public class TestUsersConsumer extends AbstractCanalMQ2RedisServiceimplements RocketMQListener { @Getter private String modelName = "user"; @Override public void onMessage(FlatMessage s) { process(s); }}
看完上述内容,你们对Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
同步
数据
对象
名称
字段
处理
更新
内容
注解
服务
配置
代码
信息
更多
知识
篇文章
类型
行业
语句
资讯
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发申请专利合作合同
通过服务器连打印机
视频软件开发计划
如何网络安全定级
崇明区数据软件开发质量保证
数据库应用技术阶段
网络安全靠大家手抄报大全
银河麒麟系统软件开发
长沙网络安全规划
oarcel数据库建表建库
网络安全赛项目
数据库oem
我的世界服务器速通挑战
河南森牧网络技术服务怎么样
国家网络安全保护办公室
域文件服务器搭建
品质网络技术咨询怎么样
希望之村如何开进服务器
路通网络技术有限公司南京
数据库项目管理师培训
管控网络安全 国安
南宁软件开发软件公司
数据库如何用for循环
眉山展厅多媒体软件开发公司
质量过硬的数据库防火墙
l数据库销售管理系统
网络安全白皮书ppt
阿里云服务器网络经常断开
网络安全法 检察机关
棋牌麻将软件开发公司