千家信息网

ZooKeeper同步框架怎么实现

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇内容主要讲解"ZooKeeper同步框架怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"ZooKeeper同步框架怎么实现"吧!首先,定义一个同
千家信息网最后更新 2025年02月03日ZooKeeper同步框架怎么实现

本篇内容主要讲解"ZooKeeper同步框架怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"ZooKeeper同步框架怎么实现"吧!

首先,定义一个同步接口,它有一个execute方法,主要负责同步任务的实现。

Path参数是任务节点(用户),只有相同的节点才会同步工作。想象一下,去银行取钱,如果每个人都有一个专属的柜台,那效率是明显的。

SynchronousProcessor参数用来处理具体的业务。

Synchronous.java

package org.bigmouth.nvwa.zookeeper.concurrent;  /** * 同步,支持分布式 *  * @author Allen Hu  * 2015-4-17 */public interface Synchronous {     /**     * 同步执行,根据path标识来区分同步工作。不同的path将不会同步进行。     *      * @param处理结果类型     * @param path 任务节点     * e.g. "/project/synchronous/0000001"     * @param processor 业务处理器     * @return 处理结果     */T execute(String path, SynchronousProcessorprocessor);}

MutexLockSynchronous.java

Synchronous的实现类,基于普通排它锁的方式实现。

package org.bigmouth.nvwa.zookeeper.concurrent; import org.apache.curator.framework.recipes.locks.InterProcessLock;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.zookeeper.common.PathUtils;import org.bigmouth.nvwa.zookeeper.ZkClientHolder;  /** * 基于普通排他锁的方式实现同步 *  * @author Allen Hu  * 2015-4-17 */public class MutexLockSynchronous implements Synchronous {     private final ZkClientHolder zkClientHolder;         public MutexLockSynchronous(ZkClientHolder zkClientHolder) {        this.zkClientHolder = zkClientHolder;    }     @Override    publicT execute(String path, SynchronousProcessorprocessor) {        PathUtils.validatePath(path);        InterProcessLock lock = new InterProcessMutex(zkClientHolder.get(), path);        try {            lock.acquire();            if (null != processor)                return processor.process();        }        catch (Exception e) {            if (null != processor)                processor.exceptionCaught(e);        }        finally {            try {                lock.release();            }            catch (Exception e) {            }        }        return null;    }}




SynchronousProcessor.java

任务处理器接口,实现它来完成具体的业务工作

package org.bigmouth.nvwa.zookeeper.concurrent;  /** * 同步业务处理器 *  * @author Allen Hu  * 2015-4-17 */public interface SynchronousProcessor{     /**     * 处理具体的业务     *      * @return     */    T process();         /**     * 异常捕获     *      * @param throwable     */    void exceptionCaught(Throwable throwable);}

ZkClientHolder.java

当然少不了这个了,继承的父类可以不需要了解,就是定义了两个抽象方法:doInit和doDestroy方法。

package org.bigmouth.nvwa.zookeeper; import org.apache.commons.lang.StringUtils;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.bigmouth.nvwa.utils.BaseLifeCycleSupport;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions;  /** * ZooKeeper client holder *  * @author Allen Hu  * 2015-4-16 */public class ZkClientHolder extends BaseLifeCycleSupport {     private static final Logger LOGGER = LoggerFactory.getLogger(ZkClientHolder.class);         public static final int MAX_RETRIES = 3;    public static final int BASE_SLEEP_TIMEMS = 3000;     private CuratorFramework zkClient;     private final String connectString;    private final int sessionTimeout;         public ZkClientHolder(String connectString, int sessionTimeout) {        Preconditions.checkArgument(StringUtils.isNotBlank(connectString), "connectString cannot be blank");        Preconditions.checkArgument(sessionTimeout >= 10000, "sessionTimeout must be greater than 10000");        this.connectString = connectString;        this.sessionTimeout = sessionTimeout;    }         public CuratorFramework get() {        return zkClient;    }     @Override    protected void doInit() {        zkClient = CuratorFrameworkFactory.builder()                .sessionTimeoutMs(sessionTimeout)                .connectString(connectString)                .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES))                .build();        zkClient.start();        if (LOGGER.isInfoEnabled()) {            LOGGER.info("Connected to ZooKepper server: {}", connectString);        }    }     @Override    protected void doDestroy() {        if (null != zkClient)            zkClient.close();    }}

最后来个测试类,模拟多个用户多线程处理任务的过程,我们达到了相同用户间同步的目的。

package org.bigmouth.nvwa.zookeeper.concurrent; import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; import org.apache.curator.utils.ZKPaths;import org.bigmouth.nvwa.zookeeper.ZkClientHolder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;  /** *  * @author Allen Hu  * 2015-4-17 */public class ConcurrentTest {         private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentTest.class);    private ZkClientHolder zkClientHolder = new ZkClientHolder("172.16.3.24:2181", 60000);    private Synchronous synchronous;         public ConcurrentTest() {        zkClientHolder.init();        synchronous = new MutexLockSynchronous(zkClientHolder);    }     public class Service implements Runnable {                 private final String id;        private final long sleepInMillis;                 public Service(String id, long sleepInMillis) {            this.id = id;            this.sleepInMillis = sleepInMillis;        }         @Override        public void run() {            synchronous.execute(ZKPaths.makePath("/nvwa/zookeeper/concurrent", id), new SynchronousProcessor() {                 @Override                public String process() {                    LOGGER.info(id + " star...!");                    try {                        Thread.sleep(sleepInMillis);                    }                    catch (InterruptedException e) {                        e.printStackTrace();                    }                    LOGGER.info(id + " has execution!");                    return id;                }                 @Override                public void exceptionCaught(Throwable throwable) {                    throwable.printStackTrace();                }            });        }    }     static ExecutorService executor = Executors.newCachedThreadPool();         public static void main(String[] args) {        ConcurrentTest ct = new ConcurrentTest();        executor.submit(ct.new Service("1", 5000)); // 1号 处理5秒        executor.submit(ct.new Service("1", 2000)); // 1号 处理2秒        executor.submit(ct.new Service("2", 5000)); // 2号 处理5秒        executor.submit(ct.new Service("3", 10000)); // 3号 处理10秒        executor.submit(ct.new Service("3", 500)); // 3号 处理0.5秒    }}


输出结果,1、2、3任务并行,而相同的任务串行。如:第二个1号等第一个1号执行完才开始。

到此,相信大家对"ZooKeeper同步框架怎么实现"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0