千家信息网

如何将数据按指定格式存入zookeeper

发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,这篇文章主要讲解了"如何将数据按指定格式存入zookeeper",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何将数据按指定格式存入zookeeper
千家信息网最后更新 2025年01月19日如何将数据按指定格式存入zookeeper

这篇文章主要讲解了"如何将数据按指定格式存入zookeeper",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何将数据按指定格式存入zookeeper"吧!

环境:

scala版本:2.11.8

zookeeper版本:3.4.5-cdh6.7.0

package com.ruozedata.zkimport java.util.concurrent.TimeUnitimport org.apache.curator.framework.CuratorFrameworkFactoryimport org.apache.curator.framework.recipes.locks.InterProcessMuteximport org.apache.curator.retry.ExponentialBackoffRetryimport org.slf4j.LoggerFactoryimport scala.collection.JavaConversions._import scala.collection.mutable/**  * Created by ganwei on 2018/08/21  * 要求:  * 1 通过storeOffsets方法把数据存入zookeeper中。  *  存储格式:  *          /consumers/G322/offsets/ruoze_offset_topic/partition/0  *          /consumers/G322/offsets/ruoze_offset_topic/partition/1  *          /consumers/G322/offsets/ruoze_offset_topic/partition/2  * 2 通过obtainOffsets方法把存入的数据读取出来  * 输出格式:  *           topic:ruoze_offset_topic partition:0     offset:7  *           topic:ruoze_offset_topic partition:1     offset:3  *           topic:ruoze_offset_topic partition:2     offset:5  */object ZkConnectApp{  val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass)  val client = {    val client = CuratorFrameworkFactory      .builder      .connectString("172.16.100.31:2181")      .retryPolicy(new ExponentialBackoffRetry(1000, 3))      .namespace("consumers")      .build()    client.start()    client  }  def lock(path: String)(body: => Unit) {    val lock = new InterProcessMutex(client, path)    lock.acquire()    try {      body    } finally {      lock.release()    }  }  def tryDo(path: String)(body: => Unit): Boolean = {    val lock = new InterProcessMutex(client, path)    if (!lock.acquire(10, TimeUnit.SECONDS)) {      LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出")      return false    }    try {      LOG.info("获准运行")      body      true    } finally {      lock.release()      LOG.info(s"释放锁 {$path}")    }  }  //zookeeper创建路径  def ensurePathExists(path: String): Unit = {    if (client.checkExists().forPath(path) == null) {      client.create().creatingParentsIfNeeded().forPath(path)    }  }  /**    * OffsetRange类定义(偏移量对象)    * 用于存储偏移量    */  case class OffsetRange(                          val topic:String,     // 主题                          val partition:Int,    // 分区                          val fromOffset:Long,  // 起始偏移量                          val utilOffset:Long   // 终止偏移量                        )  /**    * zookeeper存储offset的方法    * 写入格式:    * /consumers/G322/offsets/ruoze_offset_topic/partition/0    * /consumers/G322/offsets/ruoze_offset_topic/partition/1    * /consumers/G322/offsets/ruoze_offset_topic/partition/2    * @param OffsetsRanges    * @param groupName    */  def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={    val offsetRootPath = s"/"+groupName    if (client.checkExists().forPath(offsetRootPath) == null) {      client.create().creatingParentsIfNeeded().forPath(offsetRootPath)    }    for(els <- OffsetsRanges ){      val data = String.valueOf(els.utilOffset).getBytes      val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}"      // 创建路径      ensurePathExists(path)      // 写入数据      client.setData().forPath(path, data)    }  }  /**    * TopicAndPartition类定义(偏移量key对象)    *  用于提取偏移量    */  case class TopicAndPartition(                                topic:String,  // 主题                                partition:Int  // 分区                              )  /**    * zookeeper提取offset的方法    * @param topic    * @param groupName    * @return    */  def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={    // 定义一个空的HashMap    val maps = mutable.HashMap[TopicAndPartition,Long]()    // offset的路径    val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition"    // 判断路径是否存在    val stat = client.checkExists().forPath(s"$offsetRootPath")    if (stat == null ){      println(stat)  // 路径不存在 就将路径打印在控制台,检查路径    }else{      // 获取 offsetRootPath路径下一级的所有子目录      // 我们这里是获取的所有分区      val children = client.getChildren.forPath(s"$offsetRootPath")     // 遍历所有的分区      for ( lines <- children ){        // 获取分区的数据        val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong        // 将 topic  partition  和数据赋值给 maps        maps(TopicAndPartition(topic,lines.toInt)) = data      }    }    // 按partition排序后 返回map对象    maps.toList.sortBy(_._1.partition).toMap  }  def main(args: Array[String]) {      //定义初始化数据      val off1 = OffsetRange("ruoze_offset_topic",0,0,7)      val off2 = OffsetRange("ruoze_offset_topic",1,0,3)      val off3 = OffsetRange("ruoze_offset_topic",2,0,5)      val arr = Array(off1,off2,off3)      //获取到namespace//      println(client.getNamespace)      // 创建路径//      val offsetRootPath = "/G322"//      if (client.checkExists().forPath(offsetRootPath) == null) {//        client.create().creatingParentsIfNeeded().forPath(offsetRootPath)//      }      //存储值      storeOffsets(arr,"G322")      //获取值      /**        * 输出格式:        * topic:ruoze_offset_topic partition:0     offset:7        * topic:ruoze_offset_topic partition:1     offset:3        * topic:ruoze_offset_topic partition:2     offset:5        */      val result = obtainOffsets("ruoze_offset_topic","G322")      for (map <- result){        println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2)      }  }}

感谢各位的阅读,以上就是"如何将数据按指定格式存入zookeeper"的内容了,经过本文的学习后,相信大家对如何将数据按指定格式存入zookeeper这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0