千家信息网

一、zookeeper--部署和使用

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,一、部署zookeeper1、资源规划服务器bigdata121/192.168.50.121,bigdata122/192.168.50.122,bigdata123/192.168.50.123z
千家信息网最后更新 2025年01月31日一、zookeeper--部署和使用

一、部署zookeeper

1、资源规划

服务器bigdata121/192.168.50.121,bigdata122/192.168.50.122,bigdata123/192.168.50.123
zookeeper版本3.4.10
系统版本centos7.2

2、集群部署

(1)安装zk

[root@bigdata121 modules]# cd /opt/modules/zookeeper-3.4.10[root@bigdata121 zookeeper-3.4.10]# mkdir zkData[root@bigdata121 zookeeper-3.4.10]# mv conf/zoo_sample.cfg conf/zoo.cfg

(2)修改zoo.cfg配置

# The number of milliseconds of each ticktickTime=2000# The number of ticks that the initial # synchronization phase can takeinitLimit=10# The number of ticks that can pass between # sending a request and getting an acknowledgementsyncLimit=5# the directory where the snapshot is stored.# do not use /tmp for storage, /tmp here is just # example sakes.#dataDir=/tmp/zookeeper# 指定zk存储数据的目录dataDir=/opt/modules/zookeeper-3.4.10/zkData# the port at which the clients will connectclientPort=2181# the maximum number of client connections.# increase this if you need to handle more clients#maxClientCnxns=60## Be sure to read the maintenance section of the # administrator guide before turning on autopurge.## http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance## The number of snapshots to retain in dataDir#autopurge.snapRetainCount=3# Purge task interval in hours# Set to "0" to disable auto purge feature#autopurge.purgeInterval=1# 这里是重点配置#############cluster#############################server.1=bigdata121:2888:3888server.2=bigdata122:2888:3888server.3=bigdata123:2888:3888

cluster配置参数解读:
Server.A=B:C:D。
A是一个数字,表示这个是第几号服务器,也就是sid;
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;不是对外的服务端口(对外的服务端口默认是2181)
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

将配置好的整个程序目录拷贝到其他机器上,使用scp或者rsync都可以,自己看着办

(3)指定服务器id
在前面配置的 dataDir 指定的目录下,创建一个"myid"文件,里面的内容就写入当前server的id,这个id就是在zk集群中的唯一标识。并且这个id需要和前面配置文件中的cluster中指定的一样,否则会报错。

(4)配置环境变量

vim /etc/profile.d/zookeeper.sh#!/bin/bashexport ZOOKEEPER_HOME=/opt/modules/zookeeper-3.4.10export PATH=${ZOOKEEPER_HOME}/bin:$PATH然后 source /etc/profile.d/zookeeper.sh

(5)启动
在三台机器上执行

启动:zkServer.sh start查看当前主机上zk的状态:zkServer.sh status[root@bigdata121 conf]# zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /opt/modules/zookeeper-3.4.10/bin/../conf/zoo.cfgMode: follower

二、常用命令

使用 zkCli.sh 进入本机的zk服务。
可以使用如下命令:

命令功能
help显示所有命令帮助
ls path [watch]使用 ls 命令来查看当前znode中所包含的内容,后面的watch表示监听该节点下子节点的改变。注意,监听触发一次之后就会失效,如果需要持续监听,需要每次触发之后重新进行监听
ls2 path [watch]查看当前节点数据并能看到更新次数等数据,类似于Linux中的 ls -l
Create普通创建(永久节点) -s 含有序列,会在节点名后面加一串序列号,常用于节点名称冲突的情况 -e 创建临时节点
get path [watch]获得节点的值。后面的watch表示监听该节点的value的改变。
Set path value设置节点的具体值
Stat查看节点状态
rmr path递归删除节点

三、zk api使用(java)

1、maven依赖

                    org.apache.zookeeper            zookeeper            3.4.10        

2、创建zk客户端

import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.junit.Before;import org.junit.Test;import java.io.IOException;import java.util.List;public class ZkTest {    public static String connectString = "bigdata121:2181,bigdata122:2181,bigdata123:2181";    public static int sessionTimeout = 2000;    public ZooKeeper zkClient = null;    @Before    public void init() throws IOException {        //创建zk客户端        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {            //返回监听事件时的处理函数,监听事件是一次性的            public void process(WatchedEvent watchedEvent) {                System.out.println(watchedEvent.getState() + "," + watchedEvent.getType() + "," + watchedEvent.getPath());                try {                    zkClient.getChildren("/", true);                } catch (KeeperException e) {                    e.printStackTrace();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });    }}

3、创建节点

public void create() {        //创建节点,参数为:节点名 节点值 权限 节点类型        //即 /wangjin  tao  开放权限  持久化节点        try {            String s = zkClient.create("/wangjin", "tao".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        } catch (KeeperException e) {            System.out.println("node exists!!!");        } catch (InterruptedException e) {            e.printStackTrace();        }    }

4、获取子节点

zkclient.getChildren(路径,是否监听)返回的是子节点的列表例子:public void getChildNode() {        try {            List children = zkClient.getChildren("/", false);            for (String node : children) {                System.out.println(node);            }        } catch (KeeperException e) {            System.out.println("node not exists!!!");        } catch (InterruptedException e) {            e.printStackTrace();        }    }

5、判断节点是否存在

 zkclient.exists(path, 是否监听) 返回的是节点的状态信息,如果为null,表示节点不存在 例子: public void nodeExist() {        //返回的是节点的状态信息,如果为null,表示节点不存在        try {            Stat stat = zkClient.exists("/king", false);            System.out.println(stat == null ? "没有" : "有");        } catch (KeeperException e) {            System.out.println("node not exists");        } catch (InterruptedException e) {            e.printStackTrace();        }    }

四、使用zk做分布式锁实例

1、maven依赖

    org.apache.curator    curator-framework    4.0.0    org.apache.curator    curator-recipes    4.0.0    org.apache.curator    curator-client    4.0.0    com.google.guava    guava    16.0.1

2、需求
模拟抢购秒杀场景,需要给商品数量加锁。

3、代码

import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;public class TestDistributedLock {    //定义共享资源    private static int count = 10;    //用于减除商品    private static void printCountNumber() {        System.out.println("***********" + Thread.currentThread().getName() + "**********");        System.out.println("当前值:" + count);        count--;        //睡2秒        try {            Thread.sleep(500);        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.println("***********" + Thread.currentThread().getName() + "**********");    }    public static void main(String[] args) {        //定义客户端重试的策略        RetryPolicy policy = new ExponentialBackoffRetry(1000,  //每次等待的时间                10); //最大重试的次数        //定义ZK的一个客户端        CuratorFramework client = CuratorFrameworkFactory.builder()                .connectString("bigdata121:2181")                .retryPolicy(policy)                .build();        //客户端对象连接zk        client.start();        //创建互斥锁,其实就是在zk上创建个节点        final InterProcessMutex lock = new InterProcessMutex(client, "/mylock");        // 启动10个线程去访问共享资源        for (int i = 0; i < 10; i++) {            new Thread(new Runnable() {                public void run() {                    try {                        //请求得到锁                        lock.acquire();                        //访问共享资源                        printCountNumber();                    } catch (Exception ex) {                        ex.printStackTrace();                    } finally {                        //释放锁                        try {                            lock.release();                        } catch (Exception e) {                            e.printStackTrace();                        }                    }                }            }).start();        }    }}
0