千家信息网

如何实现基于Jedis+ZK的分布式序列号生成器

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,本篇内容主要讲解"如何实现基于Jedis+ZK的分布式序列号生成器",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何实现基于Jedis+ZK的分布式序列号
千家信息网最后更新 2024年12月13日如何实现基于Jedis+ZK的分布式序列号生成器

本篇内容主要讲解"如何实现基于Jedis+ZK的分布式序列号生成器",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何实现基于Jedis+ZK的分布式序列号生成器"吧!

部分源码参考Jedis实现分布式锁博客:

package com.xxx.arch.seq.utlis;import com.xxx.arch.seq.client.redis.RedisSEQ;import lombok.extern.slf4j.Slf4j;/** * arch-seq 唯一code 获取客户端 * * @author jdkleo */@Slf4jpublic class SEQUtil {    /**     * 生成默认KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位     *     * @param     * @return     */    public static long getSEQ() {        return RedisSEQ.getSEQ();    }    /**     * 生成默认KEY连续的UUID,共total个     *     * @param total - 连续多少个     * @return     */    public static long[] getSEQ(long total) {        long value = RedisSEQ.getSEQ(total);        return getValueArray(value, (int) total);    }    /**     * 生成指定KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位     *     * @param seqName     * @return     */    public static long getSEQ(String seqName) {        return RedisSEQ.getSEQ(seqName, 1);    }    /**     * 生成指定KEY连续的UUID,共total个     *     * @param seqName     * @param total     * @return     */    public static long[] getSEQ(String seqName, long total) {        long value = RedisSEQ.getSEQ(seqName, total);        return getValueArray(value, (int) total);    }    private static long[] getValueArray(long value, int total) {        int n = total;        long[] ret = new long[n];        do {            ret[n - 1] = value--;        } while (--n > 0);        return ret;    }}
package com.xxx.arch.seq.client.redis;import com.xxx.arch.seq.client.tool.StreamCloseAble;import lombok.extern.slf4j.Slf4j;import java.util.Random;import java.util.concurrent.atomic.AtomicInteger;/** * Redis版本SEQ(有序SEQ) * * @author zhangyang * @createDate 2019-01-22 * @since 2.x */@Slf4jpublic class RedisSEQ extends StreamCloseAble {    //默认的REDIS SEQ初始化状态器KEY    private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT";    //默认的REDIS SEQ初始化状态器VAL    private static final String _DEFAULT_SEQ_INIT_PENDING = "pending";    private static final String _DEFAULT_SEQ_INIT_READY = "ready";    //SEQ初始化容器状态    private static volatile boolean _DEFAULT_SEQ_INIT_STATUS;    //默认REDIS SEQ序列号的名称    private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ";    //本地模式自增ID槽    private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0);    static {        JedisConfig.JedisConn jedisConn = null;        try {            jedisConn = JedisConfig.getInstance().getConn();            //if REDIS宕机或第一次:创建初始化状态成功后,初始化redis keys(该方法可以恢复上次redis宕机数据)            if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//抢到REDIS初始化锁,并将其标记为pending状态                try {                    RedisSEQTimer.getInstance().removeNotUsedKeys();                    RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,从ZK上读取初始数据                    jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,标记为ready状态                } catch (Exception e) {                    log.error(e.getMessage(), e);                    //初始化arch.seq REDIS数据异常,有可能是ZK相关问题,也有可能是REDIS问题,请排查                    log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY);                    jedisConn.del(_DEFAULT_SEQ_INIT_KEY);                }            }            //else{...} 没抢到REDIS初始化锁的话:不作任何处理        } catch (Exception e) {            log.error(e.getMessage(), e);            log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready");        } finally {            close(jedisConn);        }    }    public static Long getSEQ() {        return getSEQ(_DEFAULT_SEQ_NAME, 1);    }    public static Long getSEQ(long total) {        return getSEQ(_DEFAULT_SEQ_NAME, total);    }    public static Long getSEQ(String seqName, long total) {        Long result = null;        JedisConfig.JedisConn jedisConn = null;        try {            //获取redis连接            jedisConn = JedisConfig.getInstance().getConn();            //获得REDIS初始化状态不成功            if (!tryInitReady(jedisConn)) {                //arch.seq By REDIS版本不能正常初始化,请检查REDIS服务。                throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service.");            }            //开启分布式锁            //if (jedisConn.tryLock(seqName, 1000, 2000)) {            try {                String day = RedisSEQTimer.getInstance().getDayFormat();                String incrVal = String.format("0d", getIncrVal(jedisConn, day, seqName, total));                result = Long.parseLong(day + incrVal);            } catch (Exception e) {                e.printStackTrace();                log.warn("try lock failed,the arch.seq tool will be retry after sleep some times.");                Thread.sleep(randTime());                result = getSEQ(seqName, total);            }        } catch (Throwable e) {            log.error(e.getMessage(), e);            //redis生成失败,返回本地ID:15位纳秒+1位自然数轮询            //在获取【自增序列号:{},序列号分布式锁:{}】时发生了异常,系统返回了本地生成的自增序列号,不影响系统使用,但请管理员尽快协查!            log.error("An exception occurred while acquiring self-incremental sequence number '{}', " +                    "sequence number distributed lock '{}',The system returns the locally generated self-incremental " +                    "sequence number, which does not affect the use of the system, but the administrator should check " +                    "it as soon as possible.", seqName, seqName + "_LOCK");            result = xUUID();        } finally {            //切记,一定要释放分布式锁(注:释放锁的同时jedisConn会自动释放connection,无需再次CLOSE)            if (jedisConn != null) {                //jedisConn.unLock(seqName);                jedisConn.close();            }            if (log.isDebugEnabled()) {                log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace());            }        }        return result;        //arch.seq发生了不可预测的异常,请联系架构部处理!        //throw new RuntimeException("arch.seq发生了不可预测的异常,请联系架构部处理!");    }    private static String getStackTrace() {        StringBuilder result = new StringBuilder();        StackTraceElement[] element = Thread.currentThread().getStackTrace();        for (int i = 0; i < element.length; i++) {            result.append("\t").append(element[i]).append("\n");        }        return result.toString();    }    private static long randTime() {        return new Random().nextInt(50) + 50;    }    private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException {        int times = 0;        for (; times < 3; times++) {            if (getSEQInitReady(jedisConn)) {                break;            }            Thread.sleep(100);        }        return times < 3;    }    /**     * 获得SEQ初始化状态     *     * @param jedisConn     * @return     */    private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) {        if (!_DEFAULT_SEQ_INIT_STATUS) {            synchronized (RedisSEQ.class) {                if (!_DEFAULT_SEQ_INIT_STATUS) {                    _DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY));                }            }        }        return _DEFAULT_SEQ_INIT_STATUS;    }    /**     * 获得REDIS自增序列号最新值,并同步更新到ZK备份数据节点守护线程中     *     * @param jedisConn     * @param day     * @param seqName     * @param total     * @return     */    private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) {        String key = seqName + "_" + day;        Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key);        if (incrVal > 9999999999L) {            throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal);        }        //塞到要更新的ZK队列中        RedisSEQTimer.getInstance().push(key, incrVal);        return incrVal;    }    /**     * 单机模式生成UUID     *     * @return     */    private static Long xUUID() {        int rand = _LOCAL_INCR.incrementAndGet() % 10;        String result = System.nanoTime() + "" + rand;        return Long.parseLong(result);    }}
package com.xxx.arch.seq.client.redis;import com.xxx.arch.seq.client.tool.StreamCloseAble;import com.xxx.arch.seq.client.tool.ZkClient;import com.xxx.arch.seq.client.zk.ZkClientUtil;import org.apache.commons.lang3.time.DateUtils;import java.text.SimpleDateFormat;import java.util.*;import java.util.concurrent.ConcurrentHashMap;public class RedisSEQTimer extends StreamCloseAble {    public static final String DAY_FORMAT_PATTERN = "yyMMdd";    public static volatile RedisSEQTimer redisSEQTimer;    private final ConcurrentHashMap REDIS_INCR_MAP = new ConcurrentHashMap<>();    private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient();    private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS";    //zk节点最大值每次递增数    private long _REDIS_MAXVALUE_INIT = 10_000L;    private Timer _TIMER = new Timer(true);    //是否处于清理状态    private volatile boolean _CLEAN_STATUS;    //清理key    private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY";    private RedisSEQTimer() {        super();        //启动zk巡查服务        _TIMER.schedule(new TimerTask() {            @Override            public void run() {                checkAndConfigure();            }        }, new Date(), 1 * 60 * 1000);        //每天定时清理垃圾数据        _TIMER.schedule(new TimerTask() {            @Override            public void run() {                removeNotUsedKeys();            }        }, getFirstTime(), 24 * 60 * 60 * 1000);    }    public static RedisSEQTimer getInstance() {        if (redisSEQTimer == null) {            synchronized (RedisSEQTimer.class) {                if (redisSEQTimer == null) {                    redisSEQTimer = new RedisSEQTimer();                }            }        }        return redisSEQTimer;    }    /**     * 定期更新ZK节点     */    private synchronized void checkAndConfigure() {        if (_CLEAN_STATUS) {            return;        }        if (REDIS_INCR_MAP.isEmpty()) {            return;        }        String endDay = "_" + getDayFormat();        List notTodayKeys = new ArrayList<>();        Set> entrySet = REDIS_INCR_MAP.entrySet();        for (Map.Entry entry : entrySet) {            //不是今天的key不作处理            if (!entry.getKey().endsWith(endDay)) {                notTodayKeys.add(entry.getKey());                return;            }            //将最新的值写到zk节点上 节点格式:/KEY_yyMMdd            String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey();            if (_ZK_CLIENT.exists(zkNode)) {                _ZK_CLIENT.writeData(zkNode, entry.getValue());            } else {                try {                    _ZK_CLIENT.createPersistent(zkNode, entry.getValue());                } catch (RuntimeException e) {                    //not to write log ,it's will be retry in next time.                }            }        }        ;        if (!notTodayKeys.isEmpty()) {            for (String key : notTodayKeys) {                REDIS_INCR_MAP.remove(key);            }        }    }    /**     * 删除不再使用的KEY(包含redis和zk节点)     */    public synchronized void removeNotUsedKeys() {        if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {            return;        }        _CLEAN_STATUS = true;        JedisConfig.JedisConn jedisConn = null;        String requestId = UUID.randomUUID().toString();        boolean tryLock = false;        try {            List list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);            //保留两天。考虑到多个机器的时间可能不一致,如果在刚过零点删除了昨天的sequence,另一台机器可能还需要使用它,则会出现id重复            Date now = new Date();            Date yesterday = DateUtils.addDays(now, -1);            List keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday));            if (list != null && !list.isEmpty()) {                jedisConn = JedisConfig.getInstance().getConn();                if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) {                    JedisConfig.JedisConn finalJedisConn = jedisConn;                    for (String node : list) {                        String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length());                        if (!keepDays.contains(dayPart)) {                            REDIS_INCR_MAP.remove(node);                            finalJedisConn.del(node);                            removeZkNode(node);                        }                    }                }            }        } finally {            _CLEAN_STATUS = false;            if (jedisConn != null) {                if (tryLock) {                    jedisConn.unLock(_REMOVE_KEY, requestId);                }                jedisConn.close();            }        }    }    /**     * 移除ZK节点     *     * @param node     */    private void removeZkNode(String node) {        String path = _DEFAULT_ZK_NAMESPACE + "/" + node;        if (_ZK_CLIENT.exists(path)) {            try {                _ZK_CLIENT.delete(path);            } catch (Exception e) {            }        }    }    /**     * 获得每天定时任务的执行时间     *     * @return     */    private Date getFirstTime() {        Calendar calendar = Calendar.getInstance();        calendar.set(Calendar.HOUR_OF_DAY, 24); // 24点  可以更改时间        calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分钟 随机        calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒  随机        return calendar.getTime();    }    /**     * 获得区间随机整数     *     * @param exclude - 最大数,exclude     * @param from    - 最小数,include     * @return     */    private int getRandNum(int exclude, int from) {        return new Random().nextInt(exclude) + from;    }    /**     * 将某天的KEY塞到相应队列     *     * @param key - 业务KEY key_yyMMdd     * @param val - 值     * @return 是否成功     */    public synchronized void push(String key, Long val) {        REDIS_INCR_MAP.put(key, val);    }    public String getDayFormat() {        return getDayFormat(new Date());    }    public String getDayFormat(Date date) {        return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date);    }    /**     * 初始化redis keys     */    public void initRedisKeys() {        if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {            return;        }        List list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);        if (list != null && !list.isEmpty()) {            Long zkVal;            JedisConfig.JedisConn jedisConn = null;            for (int i = 0; i < list.size(); i++) {                zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i));                if (zkVal != null) {                    String requestId = UUID.randomUUID().toString();                    boolean tryLock = false;                    try {                        jedisConn = JedisConfig.getInstance().getConn();                        //获得锁才更新,没获得锁就放弃更新                        if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) {                            jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT));                        }                    } finally {                        if (jedisConn != null) {                            if (tryLock) {                                jedisConn.unLock(list.get(i), requestId);                            }                            jedisConn.close();                        }                    }                }            }        }    }}
package com.xxx.arch.seq.client.tool;import lombok.extern.slf4j.Slf4j;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.Collections;import java.util.List;@Slf4jpublic class ZkClient {    private CuratorFramework client;    public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        client = CuratorFrameworkFactory.builder()                .connectString(serverList)                .connectionTimeoutMs(connectionTimeoutMs)                .sessionTimeoutMs(sessionTimeout)                .retryPolicy(retryPolicy)                .build();        client.start();    }    public boolean exists(String path) {        try {            return client.checkExists().forPath(path) != null;        } catch (Exception e) {            return false;        }    }    public void writeData(String path, Long value) {        try {            client.setData().forPath(path, value.toString().getBytes());        } catch (Exception e) {            log.error(e.getMessage(), e);        }    }    public void createPersistent(String zkNode, Long value) {        try {            client.create().forPath(zkNode, value.toString().getBytes());        } catch (Exception e) {            log.error(e.getMessage(), e);        }    }    public List getChildren(String path) {        try {            return client.getChildren().forPath(path);        } catch (Exception e) {            log.error(e.getMessage(), e);        }        return Collections.emptyList();    }    public Long readData(String path) {        try {            byte[] data = client.getData().forPath(path);            return Long.parseLong(new String(data));        } catch (Exception e) {            log.error(e.getMessage(), e);        }        return null;    }    public void delete(String path) {        try {            client.delete().forPath(path);        } catch (Exception e) {            log.error(e.getMessage(), e);        }    }}
package com.xxx.arch.seq.client.zk;import com.xxx.arch.seq.client.tool.ZkClient;import com.xxx.arch.seq.constant.Constants;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZkClientUtil {    private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class);    private static volatile ZkClient zkClient = null;    public static ZkClient getZkClient() {        if (zkClient == null) {            synchronized (ZkClientUtil.class) {                if (zkClient == null) {                    initZkClient();                }            }        }        return zkClient;    }    private static void initZkClient() {        try {            String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING;            if (logger.isInfoEnabled()) {                logger.info("zk cluster[" + serverList + "]");            }            if (serverList == null || serverList.trim().isEmpty()) {                throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used");            } else {                zkClient = new ZkClient(serverList, 15000, 60000);            }        } catch (Exception e) {            logger.error(e.getMessage(), e);        }    }}
package com.xxx.arch.seq.client.tool;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.Closeable;import java.io.IOException;/** * Created by zhangyang on 2016/5/31. */public class StreamCloseAble {    private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class);    /**     * 关闭输入输出流     *     * @param closeAbles     */    public static void close(Closeable... closeAbles) {        if (closeAbles == null || closeAbles.length <= 0) {            return;        }        for (Closeable closeAble : closeAbles) {            if (closeAble != null) {                try {                    closeAble.close();                } catch (IOException e) {                    logger.error(e.getMessage(), e);                }            }        }    }}

到此,相信大家对"如何实现基于Jedis+ZK的分布式序列号生成器"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0