Zookeeper Queue队列怎么实现
发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,本篇内容介绍了"Zookeeper Queue队列怎么实现 "的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有
千家信息网最后更新 2025年02月08日Zookeeper Queue队列怎么实现
本篇内容介绍了"Zookeeper Queue队列怎么实现 "的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1: Barries: 栅栏,见面知意。
2:Queue:Queue也就是我们所说的队列
1:Barries:
1.1: 是指所有的现场都达到 barrier后才能进行后续的计算
1.2:所有的线程都完成自己的计算以后才能离开barrier
进入栅栏: 1,新建一个根节点 "/root" 2, 想进入barrier的线程在 "/root"下建立一个字节点"/root/c-i" 3,循环监听"/root"孩子节点数的变化,每当其达到Size的时候就说明有Size个线程都已经达到了Barrier的要求。
2:Queue:就是指一个生产者或消费者的模型
离开Barrier 1: 想离开Barrier的现场删除掉在"/root" 下建立的子节点 2: 循环监听"/root" 孩子节点数目的变化,当Size减少到0的时候它就可以离开了。
3 :Queue 队列的实现
1 : 建立一个根节点"/root"2 : 生产线程在"/root" 下建立一个SEQUENTAIL的节点3 : 消费线程检查"/root" 如果没有就循环的监听"/root" 节点的变化,直到它有自己的子节点,删除序号最小子字节点。
package sync; import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.util.List;import java.util.Random; import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat; public class SyncPrimitive implements Watcher { static ZooKeeper zk = null; static Integer mutex; String root; //同步原语 SyncPrimitive(String address) { if (zk == null) { try { System.out.println("Starting ZK:"); //建立Zookeeper连接,并且指定watcher zk = new ZooKeeper(address, 3000, this); //初始化锁对象 mutex = new Integer(-1); System.out.println("Finished starting ZK:" + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } } @Override synchronized public void process(WatchedEvent event) { synchronized (mutex) { //有事件发生时,调用notify,使其他wait()点得以继续 mutex.notify(); } } static public class Barrier extends SyncPrimitive { int size; String name; Barrier(String address, String root, int size) { super(address); this.root = root; this.size = size; if (zk != null) { try { //一个barrier建立一个根目录 Stat s = zk.exists(root, false); //不注册watcher if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } try { //获取自己的主机名 name = new String(InetAddress.getLocalHost() .getCanonicalHostName().toString()); } catch (UnknownHostException e) { System.out.println(e.toString()); } } boolean enter() throws KeeperException, InterruptedException { //在根目录下创建一个子节点.create和delete都会触发children wathes,这样getChildren就会收到通知,process()就会被调用 zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //一直等,直到根目录下的子节点数目达到size时,函数退出 while (true) { synchronized (mutex) { Listlist = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); //释放mutex上的锁 } else { return true; } } } } boolean leave() throws KeeperException, InterruptedException { //删除自己创建的节点 zk.delete(root + "/" + name, 0); //一直等,直到根目录下有子节点时,函数退出 while (true) { synchronized (mutex) { List list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } } static public class Queue extends SyncPrimitive { Queue(String address, String name) { super(address); this.root = name; if (zk != null) { try { //一个queue建立一个根目录 Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } } //参数i是要创建节点的data boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); //根目录下创建一个子节点,因为是SEQUENTIAL的,所以先创建的节点具有较小的序号 zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { List list = zk.getChildren(root, true); //并不能保证list[0]就是序号最小的 //如果根目录下没有子节点就一直等 if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } //找到序号最小的节点将其删除 else { Integer min = new Integer(list.get(0).substring(7)); for (String s : list) { Integer tmp = new Integer(s.substring(7)); if (tmp < min) min = tmp; } System.out.println("Temporary value:" + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } } public static void main(String[] args) { if (args[0].equals("qTest")) queueTest(args); else barrierTest(args); } private static void barrierTest(String[] args) { Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); try { boolean flag = b.enter(); System.out.println("Enter barrier:" + args[2]); if (!flag) System.out.println("Error when entering the barrier"); } catch (KeeperException e) { } catch (InterruptedException e) { } Random rand = new Random(); int r = rand.nextInt(100); for (int i = 0; i < r; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { } } try { b.leave(); } catch (KeeperException e) { } catch (InterruptedException e) { } System.out.println("Left barrier"); } private static void queueTest(String[] args) { Queue q = new Queue(args[1], "/app1"); System.out.println("Input:" + args[1]); int i; Integer max = new Integer(args[2]); if (args[3].equals("p")) { System.out.println("Producer"); for (i = 0; i < max; i++) try { q.produce(10 + 1); } catch (KeeperException e) { } catch (InterruptedException e) { } } else { System.out.println("Consumer"); for (i = 0; i < max; i++) try { int r = q.consume(); System.out.println("Item:" + r); } catch (KeeperException e) { i--; } catch (InterruptedException e) { } } } }
"Zookeeper Queue队列怎么实现 "的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
节点
根目录
线程
队列
序号
变化
循环
监听
最小
个子
内容
函数
字节
孩子
就是
数目
时候
更多
栅栏
知识
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
金华广电网络技术和银行
软件开发和项目管理简称
闵行区企业金融网络技术服务
电脑显示系统数据库无法登录
苏州有哪些打车软件开发
win服务器运行环境
开展网络安全活动的新闻
如何接 软件开发项目
重庆市网络安全网站
网络安全态势感知平台的作用
服务器 tick
数据库创建联合主键操作
重庆前端软件开发哪家实惠
计算机网络安全方面的考研
小公司使用的数据库软件
网络安全检查表
数据库系统中包含系统
广东软件开发地址
聚焦网络安全的征文
小学生网络安全绘画大全
专利分析数据库
电脑显示系统数据库无法登录
彩虹六号异种连接服务器
dbf数据库怎么编辑器
软件开发周期各阶段
网络安全专业qs世界排名
邮件接收软件开发
僵尸世界大战本地服务器联机
海南常规软件开发代理价格
高速去服务器走应急车道