千家信息网

spark读取hbase的数据实例代码

发表于:2025-01-30 作者:千家信息网编辑
千家信息网最后更新 2025年01月30日,这篇文章主要介绍"spark读取hbase的数据实例代码",在日常操作中,相信很多人在spark读取hbase的数据实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答
千家信息网最后更新 2025年01月30日spark读取hbase的数据实例代码

这篇文章主要介绍"spark读取hbase的数据实例代码",在日常操作中,相信很多人在spark读取hbase的数据实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"spark读取hbase的数据实例代码"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

package hgs.spark.hbase//https://blog.csdn.net/mlljava1111/article/details/52675901import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.client.Scanimport org.apache.hadoop.hbase.filter.FilterListimport org.apache.hadoop.hbase.filter.FilterList.Operatorimport org.apache.hadoop.hbase.filter.RowFilterimport org.apache.hadoop.hbase.filter.RegexStringComparatorimport org.apache.hadoop.hbase.filter.CompareFilter.CompareOpimport org.apache.hadoop.hbase.protobuf.ProtobufUtilimport org.apache.hadoop.hbase.util.Base64import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.filter.LongComparatorobject HbaseToSpark {  def main(args: Array[String]): Unit = {    //System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");    val conf = new SparkConf        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")    conf.setMaster("local").setAppName("hbasedata")        val context =  new SparkContext(conf)    //hbase配置    val hconf =  new HBaseConfiguration    hconf.set("hbase.zookeeper.quorum", "bigdata00:2181,bigdata01:2181,bigdata02:2181")    hconf.set("hbase.zookeeper.property.clientPort", "2181")    hconf.set(TableInputFormat.INPUT_TABLE, "test")    val scan = new Scan    //扫描的表rowkey的开始和结束    scan.setStartRow("1991".getBytes)    scan.setStopRow("3000".getBytes)    //val list = new FilterList(Operator.MUST_PASS_ALL)    //val filter1 = new RowFilter(CompareOp.GREATER_OR_EQUAL,new LongComparator(1991))    //val filter2 = new RowFilter(CompareOp.LESS_OR_EQUAL,new RegexStringComparator("3000*"))       // list.addFilter(filter1)   // list.addFilter(filter2)   //scan.setFilter(list)    //添加scan    hconf.set(TableInputFormat.SCAN, convertScanToString(scan))        val hrdd = context.newAPIHadoopRDD(hconf,        classOf[TableInputFormat],        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],          classOf[org.apache.hadoop.hbase.client.Result])            val resultrdd = hrdd.repartition(2)    //打印结果    resultrdd.foreach{case(_,value)=>{        val key = Bytes.toString(value.getRow)        val name = Bytes.toString(value.getValue("cf1".getBytes, "name".getBytes))        val age = Bytes.toString(value.getValue("cf1".getBytes, "age".getBytes))        println("rowkey:"+key+" "+"name:"+name+" "+"age:"+age)      }    }        context.stop()      }      def convertScanToString(scan: Scan) = {    val proto = ProtobufUtil.toScan(scan)    Base64.encodeBytes(proto.toByteArray)  }    }

到此,关于"spark读取hbase的数据实例代码"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0