如何将数据按指定格式存入zookeeper
发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,这篇文章主要讲解了"如何将数据按指定格式存入zookeeper",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何将数据按指定格式存入zookeeper
千家信息网最后更新 2024年12月13日如何将数据按指定格式存入zookeeper
这篇文章主要讲解了"如何将数据按指定格式存入zookeeper",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何将数据按指定格式存入zookeeper"吧!
环境:
scala版本:2.11.8
zookeeper版本:3.4.5-cdh6.7.0
package com.ruozedata.zkimport java.util.concurrent.TimeUnitimport org.apache.curator.framework.CuratorFrameworkFactoryimport org.apache.curator.framework.recipes.locks.InterProcessMuteximport org.apache.curator.retry.ExponentialBackoffRetryimport org.slf4j.LoggerFactoryimport scala.collection.JavaConversions._import scala.collection.mutable/** * Created by ganwei on 2018/08/21 * 要求: * 1 通过storeOffsets方法把数据存入zookeeper中。 * 存储格式: * /consumers/G322/offsets/ruoze_offset_topic/partition/0 * /consumers/G322/offsets/ruoze_offset_topic/partition/1 * /consumers/G322/offsets/ruoze_offset_topic/partition/2 * 2 通过obtainOffsets方法把存入的数据读取出来 * 输出格式: * topic:ruoze_offset_topic partition:0 offset:7 * topic:ruoze_offset_topic partition:1 offset:3 * topic:ruoze_offset_topic partition:2 offset:5 */object ZkConnectApp{ val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass) val client = { val client = CuratorFrameworkFactory .builder .connectString("172.16.100.31:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace("consumers") .build() client.start() client } def lock(path: String)(body: => Unit) { val lock = new InterProcessMutex(client, path) lock.acquire() try { body } finally { lock.release() } } def tryDo(path: String)(body: => Unit): Boolean = { val lock = new InterProcessMutex(client, path) if (!lock.acquire(10, TimeUnit.SECONDS)) { LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出") return false } try { LOG.info("获准运行") body true } finally { lock.release() LOG.info(s"释放锁 {$path}") } } //zookeeper创建路径 def ensurePathExists(path: String): Unit = { if (client.checkExists().forPath(path) == null) { client.create().creatingParentsIfNeeded().forPath(path) } } /** * OffsetRange类定义(偏移量对象) * 用于存储偏移量 */ case class OffsetRange( val topic:String, // 主题 val partition:Int, // 分区 val fromOffset:Long, // 起始偏移量 val utilOffset:Long // 终止偏移量 ) /** * zookeeper存储offset的方法 * 写入格式: * /consumers/G322/offsets/ruoze_offset_topic/partition/0 * /consumers/G322/offsets/ruoze_offset_topic/partition/1 * /consumers/G322/offsets/ruoze_offset_topic/partition/2 * @param OffsetsRanges * @param groupName */ def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={ val offsetRootPath = s"/"+groupName if (client.checkExists().forPath(offsetRootPath) == null) { client.create().creatingParentsIfNeeded().forPath(offsetRootPath) } for(els <- OffsetsRanges ){ val data = String.valueOf(els.utilOffset).getBytes val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}" // 创建路径 ensurePathExists(path) // 写入数据 client.setData().forPath(path, data) } } /** * TopicAndPartition类定义(偏移量key对象) * 用于提取偏移量 */ case class TopicAndPartition( topic:String, // 主题 partition:Int // 分区 ) /** * zookeeper提取offset的方法 * @param topic * @param groupName * @return */ def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={ // 定义一个空的HashMap val maps = mutable.HashMap[TopicAndPartition,Long]() // offset的路径 val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition" // 判断路径是否存在 val stat = client.checkExists().forPath(s"$offsetRootPath") if (stat == null ){ println(stat) // 路径不存在 就将路径打印在控制台,检查路径 }else{ // 获取 offsetRootPath路径下一级的所有子目录 // 我们这里是获取的所有分区 val children = client.getChildren.forPath(s"$offsetRootPath") // 遍历所有的分区 for ( lines <- children ){ // 获取分区的数据 val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong // 将 topic partition 和数据赋值给 maps maps(TopicAndPartition(topic,lines.toInt)) = data } } // 按partition排序后 返回map对象 maps.toList.sortBy(_._1.partition).toMap } def main(args: Array[String]) { //定义初始化数据 val off1 = OffsetRange("ruoze_offset_topic",0,0,7) val off2 = OffsetRange("ruoze_offset_topic",1,0,3) val off3 = OffsetRange("ruoze_offset_topic",2,0,5) val arr = Array(off1,off2,off3) //获取到namespace// println(client.getNamespace) // 创建路径// val offsetRootPath = "/G322"// if (client.checkExists().forPath(offsetRootPath) == null) {// client.create().creatingParentsIfNeeded().forPath(offsetRootPath)// } //存储值 storeOffsets(arr,"G322") //获取值 /** * 输出格式: * topic:ruoze_offset_topic partition:0 offset:7 * topic:ruoze_offset_topic partition:1 offset:3 * topic:ruoze_offset_topic partition:2 offset:5 */ val result = obtainOffsets("ruoze_offset_topic","G322") for (map <- result){ println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2) } }}
感谢各位的阅读,以上就是"如何将数据按指定格式存入zookeeper"的内容了,经过本文的学习后,相信大家对如何将数据按指定格式存入zookeeper这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
路径
格式
偏移
方法
存储
对象
学习
主题
任务
内容
版本
输出
运行
子目
子目录
就是
思路
情况
控制台
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
浩方的服务器
慧与合作软件开发方向分数
网络安全相关威胁
珠海格力校招软件开发待遇
数据库期末考试重点
图片访问服务器必须下载下来咋办
教育窗口优化服务器
湖北信通汇晟网络技术公司
文件类型 数据库文件
数据库安装提示缺少net
创业板互联网科技股
腾达dhcp服务器选择为关闭
厦门畅游网络技术有限公司
简述计算机网络技术的功能
网络安全威胁应对策略
骷髅女孩连接下载服务器失败
面试软件开发组长
启航网络技术工作室
评价对软件开发的质量
表情包服务器瘫痪
华阳通用电子软件开发工程师
网络安全三要素cia
上海圣诺网络技术
网络安全口令破解
查找期刊论文最常用的数据库
数据库企业管理
重启web服务器
传奇怎么才不用服务器玩
软件开发 需求评审
软件开发工程师职业资格证书