Curator的使用
Curator
为了更好的实现Java操作zookeeper服务器,后来出现了Curator框架,非常的强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,例如session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂的zookeeper场景的API封装 |
1 Curator框架使用(一)
Curator框架中使用链式编程风格,易读性更强,使用工厂方法创建连接对象。 1.使用CuratorFrameworkFactory的两个静态工厂方法(参数不同)来实现 1.1 connectString:连接串 1.2 retryPolicy:重试连接策略。有四种实现,分别是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed 1.3sessionTimeoutMs:会话超时时间,默认为60000ms 1.4connectionTimeoutMs连接超时时间,默认为15000ms 注意对于retryPolicy策略通过一个接口来让用户自定义实现 |
2 Curator框架使用(二)
2.1创建连接
/** 重试策略: 初始时间为1s, 重试10次 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); /** 通过工厂创建连接 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); /** 开启连接 */ cf.start(); |
2.2 新增节点
/** * 新增节点:指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容 * 1.creatingParentsIfNeeded() 递归创建父目录 * 2.withMode() 节点类型(持久|临时) * 3.forPath() 指定路径 */ cf.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/super/c1", "c1内容".getBytes()); |
2.3 删除节点
/** * 删除节点 * 1.deletingChildrenIfNeeded() 递归删除 * 2.guaranteed() 确保节点被删除 * 3. withVersion(int version) //特定版本号 */ cf.delete().deletingChildrenIfNeeded().forPath("/super"); |
2.4 读取和修改数据
/** * 读取和修改数据 : getData()和setData() */ cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes()); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2内容".getBytes()); /** 读取节点内容 */ String c2_data = new String(cf.getData().forPath("/super/c2")); System.out.println("c2_data-->"+c2_data); /** 修改节点内容 */ cf.setData().forPath("/super/c2", "修改c2的内容".getBytes()); String update_c2_data = new String(cf.getData().forPath("/super/c2")); System.out.println("update_c2_data-->"+update_c2_data); |
2.5 绑定回调函数
ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void proce***esult(CuratorFramework cf, CuratorEvent event) throws Exception { System.out.println("code-->" + event.getResultCode()); System.out.println("type-->" + event.getType()); System.out.println("线程为-->" + Thread.currentThread().getName()); } }, pool).forPath("/super/c3", "c2的内容".getBytes()); System.out.println("主线程-->" + Thread.currentThread().getName()); Thread.sleep(Integer.MAX_VALUE); |
2.6 读取子节点和判断节点是否存在
/** * 读取子节点的方法: getChildren() * 判断节点是否存在: checkExists() */ List for (String p: list) { System.out.println(p); } //如果为null标识不存在 Stat stat = cf.checkExists().forPath("/super/c4"); System.out.println(stat); |
3 Curator框架使用(三)
如果要使用类似Wather的监听功能Curator必须依赖一个jar包,Maven依赖 有了这个依赖包,使用NodeCache的方式去客户端实例中注册一个监听缓存,然后实现对应的监听方法即可,这里主要有两种监听方式 NodeCacheListener:监听节点的新增、修改操作 PathChildrenCacheListener:监听子节点的新增、修改、删除操作 |
4 Curator使用场景
4.1 分布式锁
在分布式场景中,为了保证数据的一致性,经常在程序运行的某一个点需要进行同步操作(java提供了synchronized或者Reentrantlock实现)比如看一个小示例,这个示例出现分布式不同步的问题 比如:之前是在高并发下访问一个程序,现在则是在高并发下访问多个服务器节点(分布式) 使用Curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,之前实现的时候遇到过,这里强烈推荐使用Curator分布式锁 public class Lock2 {/** zk地址 */private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";/** session超时时间 */private static final int SESSION_TIMEOUT = 5000; //MSstatic int count = 10;public static CuratorFramework createCuratorFramework(){CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(ZK_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();return cf;}public static void main(String[] args) throws Exception {final CountDownLatch countDown = new CountDownLatch(1);for (int i =0; i < 10; i++) {new Thread(new Runnable() {@Overridepublic void run() {CuratorFramework cf = createCuratorFramework();cf.start();//锁对象 client 锁节点final InterProcessMutex lock = new InterProcessMutex(cf, "/super");try {countDown.await();lock.acquire(); //获得锁number();Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();} finally {try {lock.release();//释放锁} catch (Exception e) {e.printStackTrace();}}}},"t" + i).start();;}Thread.sleep(2000);countDown.countDown();} public static void number() {count--;System.out.println(Thread.currentThread().getName() + "-->" + count);}} |
4.2 分布式计数器功能
一说到分布式计数器,可能脑海里想到AtomicInteger(原子累加)这种经典方式,如果针对一个JVM的场景当然没问题,但是现在是在分布式场景下,就需要利用Curator框架的DistributedAtomicInteger了 public class CuratorAtomicInteger {/** zk地址 */private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";/** session超时时间 */private static final int SESSION_TIMEOUT = 5000; //MSpublic static void main(String[] args) throws Exception {CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(ZK_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();cf.start();//使用DistributedAtomicIntegerDistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000));//atomicInteger.increment();atomicInteger.add(1);AtomicValue |
4.3 Barrier
4.3.1 DistributedDoubleBarrier
分布式Barrier 类DistributedDoubleBarrier: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点同时开始,中间谁先运行完毕,谁后运行完毕不关心,但是最终一定是一块退出运行的 public class CuratorBarrier {/** zk地址 */private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";/** session超时时间 */private static final int SESSION_TIMEOUT = 5000; //MSpublic static void main(String[] args) throws Exception{for (int i =0; i < 5; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {/** 实例化5个客户端对象 */CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(ZK_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();cf.start();DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5);Thread.sleep(1000 * (new Random()).nextInt(3));System.out.println(Thread.currentThread().getName() + " 已准备好!");barrier.enter();System.out.println("同时开始运行...");Thread.sleep(1000 * (new Random()).nextInt(3));System.out.println("运行完毕...");barrier.leave();System.out.println("同时退出运行...");} catch (Exception e) {e.printStackTrace();}}},"t" + i).start();;}}} |
4.3.2 DistributedBarrier
分布式Barrier 类DistributedBarrier: 它会阻塞所有节点上的等待进程(所有节点进入待执行状态),直到"某一个人吹哨"说开始执行, 然后所有的节点同时开始 public class CuratorBarrier2 {/** zk地址 */private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";/** session超时时间 */private static final int SESSION_TIMEOUT = 5000; //MSstatic DistributedBarrier barrier = null;public static void main(String[] args) throws Exception{for (int i =0; i < 5; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {/** 实例化5个客户端对象 */CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(ZK_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();cf.start();barrier = new DistributedBarrier(cf, "/superBarrier");System.out.println(Thread.currentThread().getName() + " 设置barrier");barrier.setBarrier(); //设置barrier.waitOnBarrier(); //等待System.out.println("开始执行程序...");} catch (Exception e) {e.printStackTrace();}}},"t" + i).start();;}Thread.sleep(5000);barrier.removeBarrier(); //释放}} |
5 Curator重试策略
Curator内部实现的几种重试策略: 1.ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加. 2.RetryNTimes:指定最大重试次数的重试策略 3.RetryOneTime:仅重试一次 4.RetryUntilElapsed:一直重试直到达到规定的时间 |
5.1 ExponentialBackoffRetry
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 参数说明 1.baseSleepTimeMs 初始sleep时间 2.maxRetries 最大重试次数 3.maxSleepMs 最大重试时间 |
5.2 RetryNTimes
RetryNTimes(int n, int sleepMsBetweenRetries) 参数说明 1.n 最大重试次数 2.sleepMsBetweenRetries 每次重试的间隔时间 |
5.3 RetryOneTime
RetryOneTime(int sleepMsBetweenRetry) 参数说明 1.sleepMsBetweenRetry为重试间隔的时间 |
5.4 RetryUntilElapsed
RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) 参数说明 1.maxElapsedTimeMs 最大重试时间 2.sleepMsBetweenRetries 每次重试的间隔时间 |