如何实现基于Jedis+ZK的分布式序列号生成器
发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,本篇内容主要讲解"如何实现基于Jedis+ZK的分布式序列号生成器",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何实现基于Jedis+ZK的分布式序列号
千家信息网最后更新 2024年12月13日如何实现基于Jedis+ZK的分布式序列号生成器
本篇内容主要讲解"如何实现基于Jedis+ZK的分布式序列号生成器",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何实现基于Jedis+ZK的分布式序列号生成器"吧!
部分源码参考Jedis实现分布式锁博客:
package com.xxx.arch.seq.utlis;import com.xxx.arch.seq.client.redis.RedisSEQ;import lombok.extern.slf4j.Slf4j;/** * arch-seq 唯一code 获取客户端 * * @author jdkleo */@Slf4jpublic class SEQUtil { /** * 生成默认KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位 * * @param * @return */ public static long getSEQ() { return RedisSEQ.getSEQ(); } /** * 生成默认KEY连续的UUID,共total个 * * @param total - 连续多少个 * @return */ public static long[] getSEQ(long total) { long value = RedisSEQ.getSEQ(total); return getValueArray(value, (int) total); } /** * 生成指定KEY的UUID规则: 日期yyMMdd 6位 + 分布式seqID 10位,总共6 + 10 = 16位 * * @param seqName * @return */ public static long getSEQ(String seqName) { return RedisSEQ.getSEQ(seqName, 1); } /** * 生成指定KEY连续的UUID,共total个 * * @param seqName * @param total * @return */ public static long[] getSEQ(String seqName, long total) { long value = RedisSEQ.getSEQ(seqName, total); return getValueArray(value, (int) total); } private static long[] getValueArray(long value, int total) { int n = total; long[] ret = new long[n]; do { ret[n - 1] = value--; } while (--n > 0); return ret; }}
package com.xxx.arch.seq.client.redis;import com.xxx.arch.seq.client.tool.StreamCloseAble;import lombok.extern.slf4j.Slf4j;import java.util.Random;import java.util.concurrent.atomic.AtomicInteger;/** * Redis版本SEQ(有序SEQ) * * @author zhangyang * @createDate 2019-01-22 * @since 2.x */@Slf4jpublic class RedisSEQ extends StreamCloseAble { //默认的REDIS SEQ初始化状态器KEY private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT"; //默认的REDIS SEQ初始化状态器VAL private static final String _DEFAULT_SEQ_INIT_PENDING = "pending"; private static final String _DEFAULT_SEQ_INIT_READY = "ready"; //SEQ初始化容器状态 private static volatile boolean _DEFAULT_SEQ_INIT_STATUS; //默认REDIS SEQ序列号的名称 private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ"; //本地模式自增ID槽 private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0); static { JedisConfig.JedisConn jedisConn = null; try { jedisConn = JedisConfig.getInstance().getConn(); //if REDIS宕机或第一次:创建初始化状态成功后,初始化redis keys(该方法可以恢复上次redis宕机数据) if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//抢到REDIS初始化锁,并将其标记为pending状态 try { RedisSEQTimer.getInstance().removeNotUsedKeys(); RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,从ZK上读取初始数据 jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,标记为ready状态 } catch (Exception e) { log.error(e.getMessage(), e); //初始化arch.seq REDIS数据异常,有可能是ZK相关问题,也有可能是REDIS问题,请排查 log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY); jedisConn.del(_DEFAULT_SEQ_INIT_KEY); } } //else{...} 没抢到REDIS初始化锁的话:不作任何处理 } catch (Exception e) { log.error(e.getMessage(), e); log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready"); } finally { close(jedisConn); } } public static Long getSEQ() { return getSEQ(_DEFAULT_SEQ_NAME, 1); } public static Long getSEQ(long total) { return getSEQ(_DEFAULT_SEQ_NAME, total); } public static Long getSEQ(String seqName, long total) { Long result = null; JedisConfig.JedisConn jedisConn = null; try { //获取redis连接 jedisConn = JedisConfig.getInstance().getConn(); //获得REDIS初始化状态不成功 if (!tryInitReady(jedisConn)) { //arch.seq By REDIS版本不能正常初始化,请检查REDIS服务。 throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service."); } //开启分布式锁 //if (jedisConn.tryLock(seqName, 1000, 2000)) { try { String day = RedisSEQTimer.getInstance().getDayFormat(); String incrVal = String.format("0d", getIncrVal(jedisConn, day, seqName, total)); result = Long.parseLong(day + incrVal); } catch (Exception e) { e.printStackTrace(); log.warn("try lock failed,the arch.seq tool will be retry after sleep some times."); Thread.sleep(randTime()); result = getSEQ(seqName, total); } } catch (Throwable e) { log.error(e.getMessage(), e); //redis生成失败,返回本地ID:15位纳秒+1位自然数轮询 //在获取【自增序列号:{},序列号分布式锁:{}】时发生了异常,系统返回了本地生成的自增序列号,不影响系统使用,但请管理员尽快协查! log.error("An exception occurred while acquiring self-incremental sequence number '{}', " + "sequence number distributed lock '{}',The system returns the locally generated self-incremental " + "sequence number, which does not affect the use of the system, but the administrator should check " + "it as soon as possible.", seqName, seqName + "_LOCK"); result = xUUID(); } finally { //切记,一定要释放分布式锁(注:释放锁的同时jedisConn会自动释放connection,无需再次CLOSE) if (jedisConn != null) { //jedisConn.unLock(seqName); jedisConn.close(); } if (log.isDebugEnabled()) { log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace()); } } return result; //arch.seq发生了不可预测的异常,请联系架构部处理! //throw new RuntimeException("arch.seq发生了不可预测的异常,请联系架构部处理!"); } private static String getStackTrace() { StringBuilder result = new StringBuilder(); StackTraceElement[] element = Thread.currentThread().getStackTrace(); for (int i = 0; i < element.length; i++) { result.append("\t").append(element[i]).append("\n"); } return result.toString(); } private static long randTime() { return new Random().nextInt(50) + 50; } private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException { int times = 0; for (; times < 3; times++) { if (getSEQInitReady(jedisConn)) { break; } Thread.sleep(100); } return times < 3; } /** * 获得SEQ初始化状态 * * @param jedisConn * @return */ private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) { if (!_DEFAULT_SEQ_INIT_STATUS) { synchronized (RedisSEQ.class) { if (!_DEFAULT_SEQ_INIT_STATUS) { _DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY)); } } } return _DEFAULT_SEQ_INIT_STATUS; } /** * 获得REDIS自增序列号最新值,并同步更新到ZK备份数据节点守护线程中 * * @param jedisConn * @param day * @param seqName * @param total * @return */ private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) { String key = seqName + "_" + day; Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key); if (incrVal > 9999999999L) { throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal); } //塞到要更新的ZK队列中 RedisSEQTimer.getInstance().push(key, incrVal); return incrVal; } /** * 单机模式生成UUID * * @return */ private static Long xUUID() { int rand = _LOCAL_INCR.incrementAndGet() % 10; String result = System.nanoTime() + "" + rand; return Long.parseLong(result); }}
package com.xxx.arch.seq.client.redis;import com.xxx.arch.seq.client.tool.StreamCloseAble;import com.xxx.arch.seq.client.tool.ZkClient;import com.xxx.arch.seq.client.zk.ZkClientUtil;import org.apache.commons.lang3.time.DateUtils;import java.text.SimpleDateFormat;import java.util.*;import java.util.concurrent.ConcurrentHashMap;public class RedisSEQTimer extends StreamCloseAble { public static final String DAY_FORMAT_PATTERN = "yyMMdd"; public static volatile RedisSEQTimer redisSEQTimer; private final ConcurrentHashMapREDIS_INCR_MAP = new ConcurrentHashMap<>(); private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient(); private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS"; //zk节点最大值每次递增数 private long _REDIS_MAXVALUE_INIT = 10_000L; private Timer _TIMER = new Timer(true); //是否处于清理状态 private volatile boolean _CLEAN_STATUS; //清理key private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY"; private RedisSEQTimer() { super(); //启动zk巡查服务 _TIMER.schedule(new TimerTask() { @Override public void run() { checkAndConfigure(); } }, new Date(), 1 * 60 * 1000); //每天定时清理垃圾数据 _TIMER.schedule(new TimerTask() { @Override public void run() { removeNotUsedKeys(); } }, getFirstTime(), 24 * 60 * 60 * 1000); } public static RedisSEQTimer getInstance() { if (redisSEQTimer == null) { synchronized (RedisSEQTimer.class) { if (redisSEQTimer == null) { redisSEQTimer = new RedisSEQTimer(); } } } return redisSEQTimer; } /** * 定期更新ZK节点 */ private synchronized void checkAndConfigure() { if (_CLEAN_STATUS) { return; } if (REDIS_INCR_MAP.isEmpty()) { return; } String endDay = "_" + getDayFormat(); List notTodayKeys = new ArrayList<>(); Set > entrySet = REDIS_INCR_MAP.entrySet(); for (Map.Entry entry : entrySet) { //不是今天的key不作处理 if (!entry.getKey().endsWith(endDay)) { notTodayKeys.add(entry.getKey()); return; } //将最新的值写到zk节点上 节点格式: /KEY_yyMMdd String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey(); if (_ZK_CLIENT.exists(zkNode)) { _ZK_CLIENT.writeData(zkNode, entry.getValue()); } else { try { _ZK_CLIENT.createPersistent(zkNode, entry.getValue()); } catch (RuntimeException e) { //not to write log ,it's will be retry in next time. } } } ; if (!notTodayKeys.isEmpty()) { for (String key : notTodayKeys) { REDIS_INCR_MAP.remove(key); } } } /** * 删除不再使用的KEY(包含redis和zk节点) */ public synchronized void removeNotUsedKeys() { if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) { return; } _CLEAN_STATUS = true; JedisConfig.JedisConn jedisConn = null; String requestId = UUID.randomUUID().toString(); boolean tryLock = false; try { List list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE); //保留两天。考虑到多个机器的时间可能不一致,如果在刚过零点删除了昨天的sequence,另一台机器可能还需要使用它,则会出现id重复 Date now = new Date(); Date yesterday = DateUtils.addDays(now, -1); List keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday)); if (list != null && !list.isEmpty()) { jedisConn = JedisConfig.getInstance().getConn(); if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) { JedisConfig.JedisConn finalJedisConn = jedisConn; for (String node : list) { String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length()); if (!keepDays.contains(dayPart)) { REDIS_INCR_MAP.remove(node); finalJedisConn.del(node); removeZkNode(node); } } } } } finally { _CLEAN_STATUS = false; if (jedisConn != null) { if (tryLock) { jedisConn.unLock(_REMOVE_KEY, requestId); } jedisConn.close(); } } } /** * 移除ZK节点 * * @param node */ private void removeZkNode(String node) { String path = _DEFAULT_ZK_NAMESPACE + "/" + node; if (_ZK_CLIENT.exists(path)) { try { _ZK_CLIENT.delete(path); } catch (Exception e) { } } } /** * 获得每天定时任务的执行时间 * * @return */ private Date getFirstTime() { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.HOUR_OF_DAY, 24); // 24点 可以更改时间 calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分钟 随机 calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒 随机 return calendar.getTime(); } /** * 获得区间随机整数 * * @param exclude - 最大数,exclude * @param from - 最小数,include * @return */ private int getRandNum(int exclude, int from) { return new Random().nextInt(exclude) + from; } /** * 将某天的KEY塞到相应队列 * * @param key - 业务KEY key_yyMMdd * @param val - 值 * @return 是否成功 */ public synchronized void push(String key, Long val) { REDIS_INCR_MAP.put(key, val); } public String getDayFormat() { return getDayFormat(new Date()); } public String getDayFormat(Date date) { return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date); } /** * 初始化redis keys */ public void initRedisKeys() { if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) { return; } List list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE); if (list != null && !list.isEmpty()) { Long zkVal; JedisConfig.JedisConn jedisConn = null; for (int i = 0; i < list.size(); i++) { zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i)); if (zkVal != null) { String requestId = UUID.randomUUID().toString(); boolean tryLock = false; try { jedisConn = JedisConfig.getInstance().getConn(); //获得锁才更新,没获得锁就放弃更新 if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) { jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT)); } } finally { if (jedisConn != null) { if (tryLock) { jedisConn.unLock(list.get(i), requestId); } jedisConn.close(); } } } } } }}
package com.xxx.arch.seq.client.tool;import lombok.extern.slf4j.Slf4j;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.Collections;import java.util.List;@Slf4jpublic class ZkClient { private CuratorFramework client; public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(serverList) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeout) .retryPolicy(retryPolicy) .build(); client.start(); } public boolean exists(String path) { try { return client.checkExists().forPath(path) != null; } catch (Exception e) { return false; } } public void writeData(String path, Long value) { try { client.setData().forPath(path, value.toString().getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); } } public void createPersistent(String zkNode, Long value) { try { client.create().forPath(zkNode, value.toString().getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); } } public ListgetChildren(String path) { try { return client.getChildren().forPath(path); } catch (Exception e) { log.error(e.getMessage(), e); } return Collections.emptyList(); } public Long readData(String path) { try { byte[] data = client.getData().forPath(path); return Long.parseLong(new String(data)); } catch (Exception e) { log.error(e.getMessage(), e); } return null; } public void delete(String path) { try { client.delete().forPath(path); } catch (Exception e) { log.error(e.getMessage(), e); } }}
package com.xxx.arch.seq.client.zk;import com.xxx.arch.seq.client.tool.ZkClient;import com.xxx.arch.seq.constant.Constants;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZkClientUtil { private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class); private static volatile ZkClient zkClient = null; public static ZkClient getZkClient() { if (zkClient == null) { synchronized (ZkClientUtil.class) { if (zkClient == null) { initZkClient(); } } } return zkClient; } private static void initZkClient() { try { String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING; if (logger.isInfoEnabled()) { logger.info("zk cluster[" + serverList + "]"); } if (serverList == null || serverList.trim().isEmpty()) { throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used"); } else { zkClient = new ZkClient(serverList, 15000, 60000); } } catch (Exception e) { logger.error(e.getMessage(), e); } }}
package com.xxx.arch.seq.client.tool;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.Closeable;import java.io.IOException;/** * Created by zhangyang on 2016/5/31. */public class StreamCloseAble { private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class); /** * 关闭输入输出流 * * @param closeAbles */ public static void close(Closeable... closeAbles) { if (closeAbles == null || closeAbles.length <= 0) { return; } for (Closeable closeAble : closeAbles) { if (closeAble != null) { try { closeAble.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } }}
到此,相信大家对"如何实现基于Jedis+ZK的分布式序列号生成器"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
生成
分布式
状态
序列
序列号
节点
数据
更新
生成器
成功
时间
处理
最大
内容
方法
日期
机器
架构
标记
模式
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
中国哲学社会科学期刊数据库官网
语音服务器异常怎么解决
英信服务器是什么原因
以太网连接服务器设置
软件开发项目 增值税税率
计算机网络技术初步了解
洛阳直销软件开发价格
极无双服务器
软件开发的方法有
带源码软件开发报价
寄生虫3D数据库
数据库的文件夹怎么找
成都网络安全工程师入行门槛低
2018医院网络安全
荣成软件开发报价
数据库和数据库系统的简称
使用代理服务器会被发现吗
软件开发中的工程模型
服务器怎么将自动改为手动
我的世界服务器整合包
使用ip连接实验室服务器
上海华庆互联网络科技
苏州云服务器咨询客服
长沙财务软件开发服务费
搭建公司文件共享服务器
苏州网络安全审计系统咨询费用
网络技术中as是什么
绍兴手机软件开发客户至上
网络安全法 错误的有
普陀区正规数据库系统销售商家