千家信息网

Curator的使用

发表于:2024-09-30 作者:千家信息网编辑
千家信息网最后更新 2024年09月30日,Curator为了更好的实现Java操作zookeeper服务器,后来出现了Curator框架,非常的强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,例如session超时重连、主从
千家信息网最后更新 2024年09月30日Curator的使用


Curator



为了更好的实现Java操作zookeeper服务器,后来出现了Curator框架,非常的强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,例如session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂的zookeeper场景的API封装




1 Curator框架使用(一)

Curator框架中使用链式编程风格,易读性更强,使用工厂方法创建连接对象。

1.使用CuratorFrameworkFactory的两个静态工厂方法(参数不同)来实现

1.1 connectString:连接串

1.2 retryPolicy:重试连接策略。有四种实现,分别是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed

1.3sessionTimeoutMs:会话超时时间,默认为60000ms

1.4connectionTimeoutMs连接超时时间,默认为15000ms

注意对于retryPolicy策略通过一个接口来让用户自定义实现




2 Curator框架使用(二)

2.1创建连接

/** 重试策略: 初始时间为1s, 重试10次 */

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);

/** 通过工厂创建连接 */

CuratorFramework cf = CuratorFrameworkFactory.builder()

.connectString(ZK_ADDR)

.sessionTimeoutMs(SESSION_TIMEOUT)

.retryPolicy(retryPolicy)

.build();

/** 开启连接 */

cf.start();



2.2 新增节点

/**

* 新增节点:指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容

* 1.creatingParentsIfNeeded() 递归创建父目录

* 2.withMode() 节点类型(持久|临时)

* 3.forPath() 指定路径

*/

cf.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.PERSISTENT)

.forPath("/super/c1", "c1内容".getBytes());



2.3 删除节点

/**

* 删除节点

* 1.deletingChildrenIfNeeded() 递归删除

* 2.guaranteed() 确保节点被删除

* 3. withVersion(int version) //特定版本号

*/

cf.delete().deletingChildrenIfNeeded().forPath("/super");



2.4 读取和修改数据

/**

* 读取和修改数据 : getData()和setData()

*/

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes());

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2内容".getBytes());

/** 读取节点内容 */

String c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("c2_data-->"+c2_data);

/** 修改节点内容 */

cf.setData().forPath("/super/c2", "修改c2的内容".getBytes());

String update_c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("update_c2_data-->"+update_c2_data);



2.5 绑定回调函数

ExecutorService pool = Executors.newCachedThreadPool();

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)

.inBackground(new BackgroundCallback() {


@Override

public void proce***esult(CuratorFramework cf, CuratorEvent event)

throws Exception {

System.out.println("code-->" + event.getResultCode());

System.out.println("type-->" + event.getType());

System.out.println("线程为-->" + Thread.currentThread().getName());

}

}, pool).forPath("/super/c3", "c2的内容".getBytes());

System.out.println("主线程-->" + Thread.currentThread().getName());

Thread.sleep(Integer.MAX_VALUE);



2.6 读取子节点和判断节点是否存在

/**

* 读取子节点的方法: getChildren()

* 判断节点是否存在: checkExists()

*/

List list = cf.getChildren().forPath("/super");

for (String p: list) {

System.out.println(p);

}

//如果为null标识不存在

Stat stat = cf.checkExists().forPath("/super/c4");

System.out.println(stat);



3 Curator框架使用(三)

如果要使用类似Wather的监听功能Curator必须依赖一个jar包,Maven依赖

org.apache.curator

curator-recipes

2.4.2

有了这个依赖包,使用NodeCache的方式去客户端实例中注册一个监听缓存,然后实现对应的监听方法即可,这里主要有两种监听方式

NodeCacheListener:监听节点的新增、修改操作

PathChildrenCacheListener:监听子节点的新增、修改、删除操作



4 Curator使用场景

4.1 分布式锁

在分布式场景中,为了保证数据的一致性,经常在程序运行的某一个点需要进行同步操作(java提供了synchronized或者Reentrantlock实现)比如看一个小示例,这个示例出现分布式不同步的问题

比如:之前是在高并发下访问一个程序,现在则是在高并发下访问多个服务器节点(分布式)

使用Curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,之前实现的时候遇到过,这里强烈推荐使用Curator分布式锁

public class Lock2 {/** zk地址 */private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";/** session超时时间 */private static final int SESSION_TIMEOUT = 5000; //MSstatic int count = 10;public static CuratorFramework createCuratorFramework(){CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(ZK_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();return cf;}public static void main(String[] args) throws Exception {final CountDownLatch countDown = new CountDownLatch(1);for (int i =0; i < 10; i++) {new Thread(new Runnable() {@Overridepublic void run() {CuratorFramework cf = createCuratorFramework();cf.start();//锁对象 client 锁节点final InterProcessMutex lock = new InterProcessMutex(cf, "/super");try {countDown.await();lock.acquire(); //获得锁number();Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();} finally {try {lock.release();//释放锁} catch (Exception e) {e.printStackTrace();}}}},"t" + i).start();;}Thread.sleep(2000);countDown.countDown();} public static void number() {count--;System.out.println(Thread.currentThread().getName() + "-->" + count);}}




4.2 分布式计数器功能

一说到分布式计数器,可能脑海里想到AtomicInteger(原子累加)这种经典方式,如果针对一个JVM的场景当然没问题,但是现在是在分布式场景下,就需要利用Curator框架的DistributedAtomicInteger了

public class CuratorAtomicInteger {/** zk地址 */private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";/** session超时时间 */private static final int SESSION_TIMEOUT = 5000; //MSpublic static void main(String[] args) throws Exception {CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(ZK_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();cf.start();//使用DistributedAtomicIntegerDistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000));//atomicInteger.increment();atomicInteger.add(1);AtomicValue atomicValue = atomicInteger.get();System.out.println("atomicValue.succeeded()-->" + atomicValue.succeeded());System.out.println("atomicValue.postValue()-->" + atomicValue.postValue());System.out.println("atomicValue.preValue()-->" + atomicValue.preValue());}}





4.3 Barrier

4.3.1 DistributedDoubleBarrier

分布式Barrier 类DistributedDoubleBarrier: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点同时开始,中间谁先运行完毕,谁后运行完毕不关心,但是最终一定是一块退出运行的

public class CuratorBarrier {/** zk地址 */private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";/** session超时时间 */private static final int SESSION_TIMEOUT = 5000; //MSpublic static void main(String[] args) throws Exception{for (int i =0; i < 5; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {/** 实例化5个客户端对象 */CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(ZK_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();cf.start();DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5);Thread.sleep(1000 * (new Random()).nextInt(3));System.out.println(Thread.currentThread().getName() + " 已准备好!");barrier.enter();System.out.println("同时开始运行...");Thread.sleep(1000 * (new Random()).nextInt(3));System.out.println("运行完毕...");barrier.leave();System.out.println("同时退出运行...");} catch (Exception e) {e.printStackTrace();}}},"t" + i).start();;}}}





4.3.2 DistributedBarrier

分布式Barrier 类DistributedBarrier: 它会阻塞所有节点上的等待进程(所有节点进入待执行状态),直到"某一个人吹哨"说开始执行, 然后所有的节点同时开始

public class CuratorBarrier2 {/** zk地址 */private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";/** session超时时间 */private static final int SESSION_TIMEOUT = 5000; //MSstatic DistributedBarrier barrier = null;public static void main(String[] args) throws Exception{for (int i =0; i < 5; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {/** 实例化5个客户端对象 */CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(ZK_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();cf.start();barrier = new DistributedBarrier(cf, "/superBarrier");System.out.println(Thread.currentThread().getName() + " 设置barrier");barrier.setBarrier(); //设置barrier.waitOnBarrier(); //等待System.out.println("开始执行程序...");} catch (Exception e) {e.printStackTrace();}}},"t" + i).start();;}Thread.sleep(5000);barrier.removeBarrier(); //释放}}





5 Curator重试策略

Curator内部实现的几种重试策略:

1.ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.

2.RetryNTimes:指定最大重试次数的重试策略

3.RetryOneTime:仅重试一次

4.RetryUntilElapsed:一直重试直到达到规定的时间



5.1 ExponentialBackoffRetry

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

参数说明

1.baseSleepTimeMs 初始sleep时间

2.maxRetries 最大重试次数

3.maxSleepMs 最大重试时间




5.2 RetryNTimes

RetryNTimes(int n, int sleepMsBetweenRetries)


参数说明

1.n 最大重试次数

2.sleepMsBetweenRetries 每次重试的间隔时间



5.3 RetryOneTime

RetryOneTime(int sleepMsBetweenRetry)

参数说明

1.sleepMsBetweenRetry为重试间隔的时间

5.4 RetryUntilElapsed

RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

参数说明

1.maxElapsedTimeMs 最大重试时间

2.sleepMsBetweenRetries 每次重试的间隔时间













节点 分布式 时间 内容 运行 场景 框架 策略 监听 最大 参数 数据 同时 地址 方法 次数 实例 客户 客户端 对象 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 山西忻州首选dns服务器云主机 数据库并发有必要用多线程吗 上海佳慧网络技术有限公司 了解软件开发的各种模式 学习软件开发先学什么 dba数据库管理教程 中广破天一剑服务器 服务器被入侵数据被删除 三星服务器内存条台式能用吗 药学文献全文检索数据库 5g手机网络安全问题 广州华商技工学软件开发 系统自带的文件服务器优缺点 脚本中连接数据库 一般数据的来源主要有 数据库 计算机网络技术大一学啥 服务器之间数据传输加密 服务器以太网卡波动大 iphone连接服务器消失 网络安全 领土安全 网络安全事件应急处置演练总结 如何更新服务器吃鸡 dba数据库管理教程 三星服务器内存条台式能用吗 德州热电厂自动化控制软件开发 贵州软件开发厂家现货 网络安全 消费无忧 数据库查询结果 计算机网络技术类工作 客户定制软件开发需求调研表
0