如何将数据按指定格式存入zookeeper
发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,这篇文章主要讲解了"如何将数据按指定格式存入zookeeper",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何将数据按指定格式存入zookeeper
千家信息网最后更新 2025年01月19日如何将数据按指定格式存入zookeeper
这篇文章主要讲解了"如何将数据按指定格式存入zookeeper",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何将数据按指定格式存入zookeeper"吧!
环境:
scala版本:2.11.8
zookeeper版本:3.4.5-cdh6.7.0
package com.ruozedata.zkimport java.util.concurrent.TimeUnitimport org.apache.curator.framework.CuratorFrameworkFactoryimport org.apache.curator.framework.recipes.locks.InterProcessMuteximport org.apache.curator.retry.ExponentialBackoffRetryimport org.slf4j.LoggerFactoryimport scala.collection.JavaConversions._import scala.collection.mutable/** * Created by ganwei on 2018/08/21 * 要求: * 1 通过storeOffsets方法把数据存入zookeeper中。 * 存储格式: * /consumers/G322/offsets/ruoze_offset_topic/partition/0 * /consumers/G322/offsets/ruoze_offset_topic/partition/1 * /consumers/G322/offsets/ruoze_offset_topic/partition/2 * 2 通过obtainOffsets方法把存入的数据读取出来 * 输出格式: * topic:ruoze_offset_topic partition:0 offset:7 * topic:ruoze_offset_topic partition:1 offset:3 * topic:ruoze_offset_topic partition:2 offset:5 */object ZkConnectApp{ val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass) val client = { val client = CuratorFrameworkFactory .builder .connectString("172.16.100.31:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace("consumers") .build() client.start() client } def lock(path: String)(body: => Unit) { val lock = new InterProcessMutex(client, path) lock.acquire() try { body } finally { lock.release() } } def tryDo(path: String)(body: => Unit): Boolean = { val lock = new InterProcessMutex(client, path) if (!lock.acquire(10, TimeUnit.SECONDS)) { LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出") return false } try { LOG.info("获准运行") body true } finally { lock.release() LOG.info(s"释放锁 {$path}") } } //zookeeper创建路径 def ensurePathExists(path: String): Unit = { if (client.checkExists().forPath(path) == null) { client.create().creatingParentsIfNeeded().forPath(path) } } /** * OffsetRange类定义(偏移量对象) * 用于存储偏移量 */ case class OffsetRange( val topic:String, // 主题 val partition:Int, // 分区 val fromOffset:Long, // 起始偏移量 val utilOffset:Long // 终止偏移量 ) /** * zookeeper存储offset的方法 * 写入格式: * /consumers/G322/offsets/ruoze_offset_topic/partition/0 * /consumers/G322/offsets/ruoze_offset_topic/partition/1 * /consumers/G322/offsets/ruoze_offset_topic/partition/2 * @param OffsetsRanges * @param groupName */ def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={ val offsetRootPath = s"/"+groupName if (client.checkExists().forPath(offsetRootPath) == null) { client.create().creatingParentsIfNeeded().forPath(offsetRootPath) } for(els <- OffsetsRanges ){ val data = String.valueOf(els.utilOffset).getBytes val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}" // 创建路径 ensurePathExists(path) // 写入数据 client.setData().forPath(path, data) } } /** * TopicAndPartition类定义(偏移量key对象) * 用于提取偏移量 */ case class TopicAndPartition( topic:String, // 主题 partition:Int // 分区 ) /** * zookeeper提取offset的方法 * @param topic * @param groupName * @return */ def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={ // 定义一个空的HashMap val maps = mutable.HashMap[TopicAndPartition,Long]() // offset的路径 val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition" // 判断路径是否存在 val stat = client.checkExists().forPath(s"$offsetRootPath") if (stat == null ){ println(stat) // 路径不存在 就将路径打印在控制台,检查路径 }else{ // 获取 offsetRootPath路径下一级的所有子目录 // 我们这里是获取的所有分区 val children = client.getChildren.forPath(s"$offsetRootPath") // 遍历所有的分区 for ( lines <- children ){ // 获取分区的数据 val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong // 将 topic partition 和数据赋值给 maps maps(TopicAndPartition(topic,lines.toInt)) = data } } // 按partition排序后 返回map对象 maps.toList.sortBy(_._1.partition).toMap } def main(args: Array[String]) { //定义初始化数据 val off1 = OffsetRange("ruoze_offset_topic",0,0,7) val off2 = OffsetRange("ruoze_offset_topic",1,0,3) val off3 = OffsetRange("ruoze_offset_topic",2,0,5) val arr = Array(off1,off2,off3) //获取到namespace// println(client.getNamespace) // 创建路径// val offsetRootPath = "/G322"// if (client.checkExists().forPath(offsetRootPath) == null) {// client.create().creatingParentsIfNeeded().forPath(offsetRootPath)// } //存储值 storeOffsets(arr,"G322") //获取值 /** * 输出格式: * topic:ruoze_offset_topic partition:0 offset:7 * topic:ruoze_offset_topic partition:1 offset:3 * topic:ruoze_offset_topic partition:2 offset:5 */ val result = obtainOffsets("ruoze_offset_topic","G322") for (map <- result){ println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2) } }}
感谢各位的阅读,以上就是"如何将数据按指定格式存入zookeeper"的内容了,经过本文的学习后,相信大家对如何将数据按指定格式存入zookeeper这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
路径
格式
偏移
方法
存储
对象
学习
主题
任务
内容
版本
输出
运行
子目
子目录
就是
思路
情况
控制台
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
饥荒服务器mood什么意思
网络安全手画图画
北京冬奥会网络安全运营中心
太极网络技术有限公司
临沂锦梦软件开发有限公司
wow怀旧60服务器会免费么
snc网络技术
开包子铺的服务器
启动apache服务器
软件开发公司展望
宁夏hp服务器虚拟化优势
网络安全法是
阿里云企业服务器管理系统
监控系统视频管理服务器作用
软件开发过程中最重要的环节
网络技术专业计划与实施
深信服后端服务器
网络安全知识板报大学
数据库安全案列
网络安全技术与应用好发吗
电脑服务器安全软件
广域网网络技术
仕湾网络技术
安永网络安全认证
谷歌安卓软件开发
中国电信网络安全防线
服务器内怎么配置环境运行程序
梦幻手游转服务器要钱吗
论文网络技术框架教程
静安区第三方软件开发售后服务