spark访问hbase
发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.
千家信息网最后更新 2025年01月19日spark访问hbase
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.spark.rdd.NewHadoopRDDval conf = HBaseConfiguration.create()conf.set(TableInputFormat.INPUT_TABLE, "tmp")var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])hBaseRDD.count()import scala.collection.JavaConverters._hBaseRDD.map(tuple => tuple._2).map(result => result.getColumn("cf".getBytes(), "val".getBytes())).map(keyValues => {( keyValues.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getRow, keyValues.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue)}).take(10)hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => {( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue.map(_.toChar).mkString)}).take(10)conf.set(TableInputFormat.INPUT_TABLE, "test1")//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue.map(_.toInt).mkString)}).take(10)import java.nio.ByteBufferhBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {( row._1.map(_.toChar).mkString, ByteBuffer.wrap(row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue).getLong)}).take(10)//conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "lf")conf.set(TableInputFormat.SCAN_COLUMNS, "lf:app1")//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])import java.nio.ByteBufferhBaseRDD.map(tuple => tuple._2).map(result => { ( result.getRow.map(_.toChar).mkString, ByteBuffer.wrap(result.value).getLong )}).take(10)val conf = HBaseConfiguration.create()conf.set(TableInputFormat.INPUT_TABLE, "test1")var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])var rows = hBaseRDD.map(tuple => tuple._2).map(result => result.getRow.map(_.toChar).mkString)rows.map(row => row.split("\\|")).map(r => if (r.length > 1) (r(0), r(1)) else (r(0), "") ).groupByKey.take(10)
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
计算机网络技术上机报告
林业局网络安全自查总结报告
专业网络安全审计系统价格
数据库100的阶乘
一个表格如何取自数据库
海南忆成网络技术有限公司怎么样
拓扑网络技术有限公司
去东土科技工业互联网上班咋样
宝山区银联网络技术活动方案
数据库当中出现乱码
web 服务器安全问题
hcna网络技术学习指南京东
天猫魔盒网络安全类型选择哪个
租一台服务器多少钱
工厂包装防呆软件开发
软件开发工程师技术工作总结
深圳什么网络技术开发经验丰富
软件开发电价
数据库安装报错1064
tvapp可以连接服务器吗
毕节市旅游数据库
山西app软件开发erp
2020重庆网络安全周
维普数据库可以检索的文件类型有
来电回拨显示服务器错误
安庆企业软件开发公司哪家好
诛仙手游安卓官方服务器
网络安全防护笔试
古冶区软件开发品质保障
javaweb连接数据库