千家信息网

spark与hbase怎么用

发表于:2024-11-15 作者:千家信息网编辑
千家信息网最后更新 2024年11月15日,小编给大家分享一下spark与hbase怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!package hgs.spark.hbaseimport org.apache.spar
千家信息网最后更新 2024年11月15日spark与hbase怎么用

小编给大家分享一下spark与hbase怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

package hgs.spark.hbaseimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.spark.rdd.NewHadoopRDDimport org.apache.hadoop.hbase.mapreduce.TableInputFormatobject HbaseTest {  def main(args: Array[String]): Unit = {    val conf = new SparkConf        conf.setMaster("local").setAppName("local")        val context = new SparkContext(conf)        val hadoopconf = new HBaseConfiguration    hadoopconf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181")    hadoopconf.set("hbase.zookeeper.property.clientPort", "2181")    val tableName = "test1"    hadoopconf.set(TableInputFormat.INPUT_TABLE, tableName)    hadoopconf.set(TableInputFormat.SCAN_ROW_START, "h")    hadoopconf.set(TableInputFormat.SCAN_ROW_STOP, "x")    hadoopconf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "cf1")    hadoopconf.set(TableInputFormat.SCAN_COLUMNS, "cf1:col1,cf1:col2")        /*val startrow = "h"    val stoprow = "w"        val scan = new Scan    scan.setStartRow(startrow.getBytes)    scan.setStartRow(stoprow.getBytes)        val proto = ProtobufUtil.toScan(scan)    val scanToString = Base64.encodeBytes(proto.toByteArray())    println(scanToString)    hadoopconf.set(TableInputFormat.SCAN, scanToString)    */    val hbaseRdd = context.newAPIHadoopRDD(hadoopconf,         classOf[TableInputFormat],         classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],        classOf[org.apache.hadoop.hbase.client.Result])                hbaseRdd.foreach(x=>{         val vale =  x._2.getValue("cf1".getBytes, "col1".getBytes)         val val2 = x._2.getValue("cf1".getBytes, "col2".getBytes)          println(new String(vale),new String(val2))        })    context.stop()      }}
package hgs.spark.hbaseimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.mapred.JobConfimport org.apache.hadoop.hbase.client.Putimport org.apache.hadoop.hbase.io.ImmutableBytesWritableobject SparkToHbase {  def main(args: Array[String]): Unit = {    val conf = new SparkConf        conf.setMaster("local").setAppName("local")        val context = new SparkContext(conf)        val rdd = context.parallelize(List(("aaaaaaa","aaaaaaa"),("bbbbb","bbbbb")), 2)    val hadoopconf = new HBaseConfiguration    hadoopconf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181")    hadoopconf.set("hbase.zookeeper.property.clientPort", "2181")    hadoopconf.set(TableOutputFormat.OUTPUT_TABLE, "test1")    //hadoopconf.set(TableOutputFormat., "test1")        val jobconf  = new JobConf(hadoopconf,this.getClass)    jobconf.set(TableOutputFormat.OUTPUT_TABLE, "test1")    jobconf.setOutputFormat(classOf[TableOutputFormat])        val exterrdd = rdd.map(x=>{            val put = new Put(x._1.getBytes)      put.add("cf1".getBytes, "col1".getBytes, x._2.getBytes)      (new ImmutableBytesWritable,put)    })        exterrdd.saveAsHadoopDataset(jobconf)        context.stop()              }}

看完了这篇文章,相信你对"spark与hbase怎么用"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0