千家信息网

Zookeeper Queue队列怎么实现

发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,本篇内容介绍了"Zookeeper Queue队列怎么实现 "的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有
千家信息网最后更新 2025年02月08日Zookeeper Queue队列怎么实现

本篇内容介绍了"Zookeeper Queue队列怎么实现 "的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1: Barries: 栅栏,见面知意。

2:Queue:Queue也就是我们所说的队列

1:Barries:

1.1: 是指所有的现场都达到 barrier后才能进行后续的计算

1.2:所有的线程都完成自己的计算以后才能离开barrier

进入栅栏:        1,新建一个根节点 "/root"        2, 想进入barrier的线程在 "/root"下建立一个字节点"/root/c-i"        3,循环监听"/root"孩子节点数的变化,每当其达到Size的时候就说明有Size个线程都已经达到了Barrier的要求。

2:Queue:就是指一个生产者或消费者的模型

离开Barrier        1:  想离开Barrier的现场删除掉在"/root" 下建立的子节点        2:  循环监听"/root" 孩子节点数目的变化,当Size减少到0的时候它就可以离开了。

3 :Queue 队列的实现

1 : 建立一个根节点"/root"2  : 生产线程在"/root" 下建立一个SEQUENTAIL的节点3 : 消费线程检查"/root" 如果没有就循环的监听"/root" 节点的变化,直到它有自己的子节点,删除序号最小子字节点。
package sync; import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.util.List;import java.util.Random; import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat; public class SyncPrimitive implements Watcher {    static ZooKeeper zk = null;    static Integer mutex;    String root;     //同步原语    SyncPrimitive(String address) {        if (zk == null) {            try {                System.out.println("Starting ZK:");                //建立Zookeeper连接,并且指定watcher                zk = new ZooKeeper(address, 3000, this);                //初始化锁对象                mutex = new Integer(-1);                System.out.println("Finished starting ZK:" + zk);            } catch (IOException e) {                System.out.println(e.toString());                zk = null;            }        }    }     @Override    synchronized public void process(WatchedEvent event) {        synchronized (mutex) {            //有事件发生时,调用notify,使其他wait()点得以继续            mutex.notify();        }     }     static public class Barrier extends SyncPrimitive {        int size;        String name;         Barrier(String address, String root, int size) {            super(address);            this.root = root;            this.size = size;            if (zk != null) {                try {                    //一个barrier建立一个根目录                    Stat s = zk.exists(root, false);        //不注册watcher                    if (s == null) {                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,                                CreateMode.PERSISTENT);                    }                } catch (KeeperException e) {                    System.out                            .println("keeper exception when instantiating queue:"                                    + e.toString());                } catch (InterruptedException e) {                    System.out.println("Interrupted exception.");                }            }            try {                //获取自己的主机名                name = new String(InetAddress.getLocalHost()                        .getCanonicalHostName().toString());            } catch (UnknownHostException e) {                System.out.println(e.toString());            }        }         boolean enter() throws KeeperException, InterruptedException {            //在根目录下创建一个子节点.create和delete都会触发children wathes,这样getChildren就会收到通知,process()就会被调用            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,                    CreateMode.EPHEMERAL_SEQUENTIAL);            //一直等,直到根目录下的子节点数目达到size时,函数退出            while (true) {                synchronized (mutex) {                    List list = zk.getChildren(root, true);                    if (list.size() < size) {                        mutex.wait();       //释放mutex上的锁                    } else {                        return true;                    }                }            }        }         boolean leave() throws KeeperException, InterruptedException {            //删除自己创建的节点            zk.delete(root + "/" + name, 0);            //一直等,直到根目录下有子节点时,函数退出            while (true) {                synchronized (mutex) {                    List list = zk.getChildren(root, true);                    if (list.size() > 0) {                        mutex.wait();                    } else {                        return true;                    }                }            }        }     }     static public class Queue extends SyncPrimitive {        Queue(String address, String name) {            super(address);            this.root = name;            if (zk != null) {                try {                    //一个queue建立一个根目录                    Stat s = zk.exists(root, false);                    if (s == null) {                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,                                CreateMode.PERSISTENT);                    }                } catch (KeeperException e) {                    System.out                            .println("keeper exception when instantiating queue:"                                    + e.toString());                } catch (InterruptedException e) {                    System.out.println("Interrupted exception.");                }            }        }         //参数i是要创建节点的data        boolean produce(int i) throws KeeperException, InterruptedException {            ByteBuffer b = ByteBuffer.allocate(4);            byte[] value;            b.putInt(i);            value = b.array();             //根目录下创建一个子节点,因为是SEQUENTIAL的,所以先创建的节点具有较小的序号            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,                    CreateMode.PERSISTENT_SEQUENTIAL);            return true;        }         int consume() throws KeeperException, InterruptedException {            int retvalue = -1;            Stat stat = null;            while (true) {                synchronized (mutex) {                    List list = zk.getChildren(root, true);       //并不能保证list[0]就是序号最小的                    //如果根目录下没有子节点就一直等                    if (list.size() == 0) {                        System.out.println("Going to wait");                        mutex.wait();                    }                    //找到序号最小的节点将其删除                    else {                        Integer min = new Integer(list.get(0).substring(7));                        for (String s : list) {                            Integer tmp = new Integer(s.substring(7));                            if (tmp < min)                                min = tmp;                        }                        System.out.println("Temporary value:" + root                                + "/element" + min);                        byte[] b = zk.getData(root + "/element" + min, false,                                stat);                        zk.delete(root + "/element" + min, 0);                        ByteBuffer buffer = ByteBuffer.wrap(b);                        retvalue = buffer.getInt();                        return retvalue;                    }                }            }        }    }     public static void main(String[] args) {        if (args[0].equals("qTest"))            queueTest(args);        else            barrierTest(args);    }     private static void barrierTest(String[] args) {        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));        try {            boolean flag = b.enter();            System.out.println("Enter barrier:" + args[2]);            if (!flag)                System.out.println("Error when entering the barrier");        } catch (KeeperException e) {         } catch (InterruptedException e) {        }         Random rand = new Random();        int r = rand.nextInt(100);        for (int i = 0; i < r; i++) {            try {                Thread.sleep(100);            } catch (InterruptedException e) {             }        }        try {            b.leave();        } catch (KeeperException e) {         } catch (InterruptedException e) {        }        System.out.println("Left barrier");    }     private static void queueTest(String[] args) {        Queue q = new Queue(args[1], "/app1");        System.out.println("Input:" + args[1]);        int i;        Integer max = new Integer(args[2]);         if (args[3].equals("p")) {            System.out.println("Producer");            for (i = 0; i < max; i++)                try {                    q.produce(10 + 1);                } catch (KeeperException e) {                 } catch (InterruptedException e) {                }        } else {            System.out.println("Consumer");            for (i = 0; i < max; i++)                try {                    int r = q.consume();                    System.out.println("Item:" + r);                } catch (KeeperException e) {                    i--;                } catch (InterruptedException e) {                }        }    } }

"Zookeeper Queue队列怎么实现 "的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0