ZooKeeper同步框架怎么实现
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇内容主要讲解"ZooKeeper同步框架怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"ZooKeeper同步框架怎么实现"吧!首先,定义一个同
千家信息网最后更新 2025年02月03日ZooKeeper同步框架怎么实现
本篇内容主要讲解"ZooKeeper同步框架怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"ZooKeeper同步框架怎么实现"吧!
首先,定义一个同步接口,它有一个execute方法,主要负责同步任务的实现。
Path参数是任务节点(用户),只有相同的节点才会同步工作。想象一下,去银行取钱,如果每个人都有一个专属的柜台,那效率是明显的。
SynchronousProcessor参数用来处理具体的业务。
Synchronous.java
package org.bigmouth.nvwa.zookeeper.concurrent; /** * 同步,支持分布式 * * @author Allen Hu * 2015-4-17 */public interface Synchronous { /** * 同步执行,根据path标识来区分同步工作。不同的path将不会同步进行。 * * @param处理结果类型 * @param path 任务节点 * e.g. "/project/synchronous/0000001" * @param processor 业务处理器 * @return 处理结果 */T execute(String path, SynchronousProcessorprocessor);}
MutexLockSynchronous.java
Synchronous的实现类,基于普通排它锁的方式实现。
package org.bigmouth.nvwa.zookeeper.concurrent; import org.apache.curator.framework.recipes.locks.InterProcessLock;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.zookeeper.common.PathUtils;import org.bigmouth.nvwa.zookeeper.ZkClientHolder; /** * 基于普通排他锁的方式实现同步 * * @author Allen Hu * 2015-4-17 */public class MutexLockSynchronous implements Synchronous { private final ZkClientHolder zkClientHolder; public MutexLockSynchronous(ZkClientHolder zkClientHolder) { this.zkClientHolder = zkClientHolder; } @Override publicT execute(String path, SynchronousProcessorprocessor) { PathUtils.validatePath(path); InterProcessLock lock = new InterProcessMutex(zkClientHolder.get(), path); try { lock.acquire(); if (null != processor) return processor.process(); } catch (Exception e) { if (null != processor) processor.exceptionCaught(e); } finally { try { lock.release(); } catch (Exception e) { } } return null; }}
SynchronousProcessor.java
任务处理器接口,实现它来完成具体的业务工作
package org.bigmouth.nvwa.zookeeper.concurrent; /** * 同步业务处理器 * * @author Allen Hu * 2015-4-17 */public interface SynchronousProcessor{ /** * 处理具体的业务 * * @return */ T process(); /** * 异常捕获 * * @param throwable */ void exceptionCaught(Throwable throwable);}
ZkClientHolder.java
当然少不了这个了,继承的父类可以不需要了解,就是定义了两个抽象方法:doInit和doDestroy方法。
package org.bigmouth.nvwa.zookeeper; import org.apache.commons.lang.StringUtils;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.bigmouth.nvwa.utils.BaseLifeCycleSupport;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; /** * ZooKeeper client holder * * @author Allen Hu * 2015-4-16 */public class ZkClientHolder extends BaseLifeCycleSupport { private static final Logger LOGGER = LoggerFactory.getLogger(ZkClientHolder.class); public static final int MAX_RETRIES = 3; public static final int BASE_SLEEP_TIMEMS = 3000; private CuratorFramework zkClient; private final String connectString; private final int sessionTimeout; public ZkClientHolder(String connectString, int sessionTimeout) { Preconditions.checkArgument(StringUtils.isNotBlank(connectString), "connectString cannot be blank"); Preconditions.checkArgument(sessionTimeout >= 10000, "sessionTimeout must be greater than 10000"); this.connectString = connectString; this.sessionTimeout = sessionTimeout; } public CuratorFramework get() { return zkClient; } @Override protected void doInit() { zkClient = CuratorFrameworkFactory.builder() .sessionTimeoutMs(sessionTimeout) .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES)) .build(); zkClient.start(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Connected to ZooKepper server: {}", connectString); } } @Override protected void doDestroy() { if (null != zkClient) zkClient.close(); }}
最后来个测试类,模拟多个用户多线程处理任务的过程,我们达到了相同用户间同步的目的。
package org.bigmouth.nvwa.zookeeper.concurrent; import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; import org.apache.curator.utils.ZKPaths;import org.bigmouth.nvwa.zookeeper.ZkClientHolder;import org.slf4j.Logger;import org.slf4j.LoggerFactory; /** * * @author Allen Hu * 2015-4-17 */public class ConcurrentTest { private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentTest.class); private ZkClientHolder zkClientHolder = new ZkClientHolder("172.16.3.24:2181", 60000); private Synchronous synchronous; public ConcurrentTest() { zkClientHolder.init(); synchronous = new MutexLockSynchronous(zkClientHolder); } public class Service implements Runnable { private final String id; private final long sleepInMillis; public Service(String id, long sleepInMillis) { this.id = id; this.sleepInMillis = sleepInMillis; } @Override public void run() { synchronous.execute(ZKPaths.makePath("/nvwa/zookeeper/concurrent", id), new SynchronousProcessor() { @Override public String process() { LOGGER.info(id + " star...!"); try { Thread.sleep(sleepInMillis); } catch (InterruptedException e) { e.printStackTrace(); } LOGGER.info(id + " has execution!"); return id; } @Override public void exceptionCaught(Throwable throwable) { throwable.printStackTrace(); } }); } } static ExecutorService executor = Executors.newCachedThreadPool(); public static void main(String[] args) { ConcurrentTest ct = new ConcurrentTest(); executor.submit(ct.new Service("1", 5000)); // 1号 处理5秒 executor.submit(ct.new Service("1", 2000)); // 1号 处理2秒 executor.submit(ct.new Service("2", 5000)); // 2号 处理5秒 executor.submit(ct.new Service("3", 10000)); // 3号 处理10秒 executor.submit(ct.new Service("3", 500)); // 3号 处理0.5秒 }}
输出结果,1、2、3任务并行,而相同的任务串行。如:第二个1号等第一个1号执行完才开始。
到此,相信大家对"ZooKeeper同步框架怎么实现"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
处理
同步
任务
业务
方法
框架
相同
处理器
用户
结果
节点
工作
普通
内容
参数
接口
方式
学习
不同
实用
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
常州直销软件开发客户至上
重庆上位机软件开发
月星互联网科技金融
崇明区互联网视频系统服务器
数据库创建语句是什么
gta在线模式服务器当前不可用
衡水软件开发讲师
我的世界呆呆空岛服务器
dmz区访问内网数据库
计算机网络技术证件
怎么查看服务器里面东西
金砖服务器客服
运城网络技术服务公司
阿里巴巴国际站服务器怎么开
海南特色软件开发价格表格
一个服务器可以挂两个路由器吗
大连创瑞网络技术有限公司
联想服务器ts440主板
怎么买云服务器
河北土地资产管理软件开发
软件开发用户手册注意事项
软件开发的职业资格
网络安全工程师的性格特点
dmz区访问内网数据库
数据库 查询 字段 裁剪
网络安全防护论文
浦东新区软件开发学习
中央电视台网络安全教育直播
我的世界生存服务器1.18.1
网络技术在学科中的应用