千家信息网

Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis

发表于:2024-10-01 作者:千家信息网编辑
千家信息网最后更新 2024年10月01日,今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一
千家信息网最后更新 2024年10月01日Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis

今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

一、Canal使用RocketMQ同步MySQL

Canal结合RocketMQ同步MySQL

二、 同步数据到Redis

2.1 安装Redis

2.2 Redis配置

2.3 SpringBoot配置

2.3.1 引入依赖
    com.alibaba.otter    canal.client    1.1.4    org.apache.rocketmq    rocketmq-spring-boot-starter    2.0.2    javax.persistence    persistence-api
2.3.2 通用代码

SQLType.java

import lombok.AccessLevel;import lombok.NoArgsConstructor;/** * Canal监听SQL类型 * * @author Yu * @date 2019/09/08 00:18 **/@NoArgsConstructor(access = AccessLevel.PRIVATE)public class SQLType {    /**插入*/    public static final String INSERT = "INSERT";    /**更新*/    public static final String UPDATE = "UPDATE";    /**删除*/    public static final String DELETE = "DELETE";}

User.java

import lombok.Data;import javax.persistence.Id;import java.io.Serializable;/** * UserPo对象 * * @author Yu * @date 2019/09/08 14:13 **/@Datapublic class User implements Serializable {    private static final long serialVersionUID = -6845801275112259322L;    @Id    private Integer uid;    private String username;    private String password;    private String sex;}

CanalSynService.java

import com.alibaba.otter.canal.protocol.FlatMessage;import java.util.Collection;/** * Canal同步服务 * * @author Yu * @date 2019/09/08 00:00 **/public interface CanalSynService {    /**     * 处理数据     *     * @param flatMessage CanalMQ数据     */    void process(FlatMessage flatMessage);    /**     * DDL语句处理     *     * @param flatMessage CanalMQ数据     */    void ddl(FlatMessage flatMessage);    /**     * 插入     *     * @param list 新增数据     */    void insert(Collection list);    /**     * 更新     *     * @param list 更新数据     */    void update(Collection list);    /**     * 删除     *     * @param list 删除数据     */    void delete(Collection list);}

AbstractCanalMQ2RedisService.java

import com.alibaba.otter.canal.protocol.FlatMessage;import com.google.common.collect.Sets;import com.taco.springcloud.canal.constant.SQLType;import com.taco.springcloud.core.component.ApplicationContextHolder;import com.taco.springcloud.core.exception.BizException;import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum;import com.taco.springcloud.core.utils.JsonUtil;import com.taco.springcloud.redis.utils.RedisUtils;import lombok.extern.slf4j.Slf4j;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.util.ReflectionUtils;import javax.annotation.Resource;import javax.persistence.Id;import java.lang.reflect.Field;import java.lang.reflect.ParameterizedType;import java.util.*;/** * 抽象CanalMQ通用处理服务 * * @author Yu * @date 2019/09/08 00:05 **/@Slf4jpublic abstract class AbstractCanalMQ2RedisService implements CanalSynService {    @Resource    private RedisTemplate redisTemplate;    @Resource    private RedisUtils redisUtils;    private Class cache;    /**     * 获取Model名称     *     * @return Model名称     */    protected abstract String getModelName();    @Override    public void process(FlatMessage flatMessage) {        if(flatMessage.getIsDdl()) {            ddl(flatMessage);            return;        }        Set data = getData(flatMessage);        if(SQLType.INSERT.equals(flatMessage.getType())) {            insert(data);        }        if(SQLType.UPDATE.equals(flatMessage.getType())) {            update(data);        }        if(SQLType.DELETE.equals(flatMessage.getType())) {            delete(data);        }    }    @Override    public void ddl(FlatMessage flatMessage) {        //TODO : DDL需要同步,删库清空,更新字段处理    }    @Override    public void insert(Collection list) {        insertOrUpdate(list);    }    @Override    public void update(Collection list) {        insertOrUpdate(list);    }    private void insertOrUpdate(Collection list) {        redisTemplate.executePipelined( (RedisConnection redisConnection) -> {            for (T data : list) {                String key = getWrapRedisKey(data);                RedisSerializer keySerializer = redisTemplate.getKeySerializer();                RedisSerializer valueSerializer = redisTemplate.getValueSerializer();                redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data));            }            return null;        });    }    @Override    public void delete(Collection list) {        Set keys = Sets.newHashSetWithExpectedSize(list.size());        for (T data : list) {            keys.add(getWrapRedisKey(data));        }        //Set keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet());        redisUtils.delAll(keys);    }    /**     * 封装redis的key     *     * @param t 原对象     * @return  key     */    protected String getWrapRedisKey(T t) {        return new StringBuilder()                        .append(ApplicationContextHolder.getApplicationName())                        .append(":")                        .append(getModelName())                        .append(":")                        .append(getIdValue(t))                        .toString();    }    /**     * 获取类泛型     *     * @return 泛型Class     */    protected Class getTypeArguement() {        if(cache == null) {            cache = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];        }        return cache;    }    /**     * 获取Object标有@Id注解的字段值     *     * @param t 对象     * @return  id值     */    protected Object getIdValue(T t) {        Field fieldOfId = getIdField();        ReflectionUtils.makeAccessible(fieldOfId);        return ReflectionUtils.getField(fieldOfId, t);    }    /**     * 获取Class标有@Id注解的字段名称     *     * @return id字段名称     */    protected Field getIdField() {        Class clz = getTypeArguement();        Field[] fields = clz.getDeclaredFields();        for (Field field : fields) {            Id annotation = field.getAnnotation(Id.class);            if (annotation != null) {                return field;            }        }        log.error("PO类未设置@Id注解");        throw new BizException(BaseApiCodeEnum.FAIL);    }    /**     * 转换Canal的FlatMessage中data成泛型对象     *     * @param flatMessage   Canal发送MQ信息     * @return              泛型对象集合     */    protected Set getData(FlatMessage flatMessage) {        List> sourceData = flatMessage.getData();        Set targetData = Sets.newHashSetWithExpectedSize(sourceData.size());        for (Map map : sourceData) {            T t = JsonUtil.mapConvertPojo(map, getTypeArguement());            targetData.add(t);        }        return targetData;    }}

TestUsersConsumer.java

import com.alibaba.otter.canal.protocol.FlatMessage;import com.taco.springcloud.canal.model.User;import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;@Slf4j@Service@RocketMQMessageListener(topic = "test_users", consumerGroup = "users")public class TestUsersConsumer extends AbstractCanalMQ2RedisService implements RocketMQListener {    @Getter    private String modelName = "user";    @Override    public void onMessage(FlatMessage s) {        process(s);    }}

看完上述内容,你们对Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0