打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
使用Spark读取HBase中的数据 – 过往记忆

  在《Spark读取Hbase中的数据》文章中我介绍了如何在Spark中读取Hbase中的数据,并提供了Java和Scala两个版本的实现,本文将接着上文介绍如何通过Spark将计算好的数据存储到Hbase中。

  Spark中内置提供了两个方法可以将数据写入到Hbase:(1)、saveAsHadoopDataset;(2)、saveAsNewAPIHadoopDataset,它们的官方介绍分别如下:
  saveAsHadoopDataset: Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system. The JobConf should set an OutputFormat and any output paths required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.
  saveAsNewAPIHadoopDataset: Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop Configuration object for that storage system. The Conf should set an OutputFormat and any output paths required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.

可以看出这两个API分别是针对mapredmapreduce实现的,本文将提供这两个版本的实现实例代码。在编写代码之前我们先在pom.xml文件中引入一下的依赖:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>0.9.1</version>
</dependency>
  
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
  
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
  
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
  
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

saveAsHadoopDataset

package com.iteblog.bigdata.hbase
import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 2016-11-29
 Time: 22:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1892
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共账号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
object SparkToHBase {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: SparkToHBase <input file>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("SparkToHBase")
    val sc = new SparkContext(conf)
    val input = sc.textFile(args(0))
    //创建HBase配置
    val hConf = HBaseConfiguration.create()
    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")
    //创建JobConf,设置输出格式和表名
    val jobConf = new JobConf(hConf, this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")
    val data = input.map { item =>
      val Array(key, value) = item.split("\t")
      val rowKey = key.reverse
      val put = new Put(Bytes.toBytes(rowKey))
      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
      (new ImmutableBytesWritable, put)
    }
    //保存到HBase表
    data.saveAsHadoopDataset(jobConf)
    sc.stop()
  }
}

我们输入的数据格式是:

0015A49A8F2A60DACEE0160545C58F94    1234
0152C9666B5F3426DDDB5FB74BDBCE4F    4366
0160D90AC268AEB595208E8448C7F8B8    6577
0225A39EB29BDB582CA58BE86791ACBC    1234
02462ACDF7232C49890B07D63B50C5E1    4366
030730EBE05740C992840525E35BC8AD    7577
038A459BC05F3B655F5655C810E76352    7577
0417D3FD71458C4BAD1E5AFDE7259930    7577
042CD42B657C46D0D4E5CC69AFDD7E54    7577
051069378849ACF97BFAD09D3A9C7702    7577
05E833C9C763A98323E0328DA0A31039    7577
060E206514A24D944305D370F615F8E9    7577
087E8488796C29E1C8239565666CE2D7    7577
09A425F1DD240A7150ECEFAA0BFF25FA    7577
0B27E3CB5F3F32EB3715DB8E2D333BED    7577
0B27E82A4CEE73BBB98438DFB0DB2FFE    7577
0BAEEB7A12DCEF20EE26D7A030164DFF    7577
0C5BFC45F64907A61ECB1C892F98525C    7577
0C74F2FFD1BB3598BC8DB10C37DBA6B4    7577
0C9CEE40DDD961C7D2BBE0491FDF92A8    7577
0CC578371622F932287EB81065F81F5F    7577
0D6B03EFDAE7165A0F7CC79EABEAC0D3    7577
0DF7B014187A9AB2F1049781592CC053    7577
0E67D8ABDB3749D58207A7B45FEA7F12    7577
0E866677E79A7843E0EDCF2BE0141911    7577
0EAF4A69BA3BF05E8EA75CC1287304A3    7577
0EE2969AE674DF5F8944B5EA2E97DBEC    7577
0FAA253D53BC6D831CF6E742147C3BED    7577
0FB92AC3DE664BFF40D334DA8EE97B85    7577

第一列将作为HBase的Rowkey存储,第二列就是info的值。

saveAsNewAPIHadoopDataset

package com.iteblog.bigdata.hbase
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkContext, SparkConf}
/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 2016-11-29
 Time: 22:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1892
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共账号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
object SparkToHBaseNew {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: SparkToHBaseNew <input file>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("SparkToHBaseNew")
    val sc = new SparkContext(conf)
    val input = sc.textFile(args(0))
    val hConf = HBaseConfiguration.create()
    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")
    val jobConf = new JobConf(hConf, this.getClass)
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")
    //设置job的输出格式
    val job = Job.getInstance(jobConf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    val data = input.map { item =>
      val Array(key, value) = item.split("\t")
      val rowKey = key.reverse
      val put = new Put(Bytes.toBytes(rowKey))
      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
      (new ImmutableBytesWritable, put)
    }
    //保存到HBase表
    data.saveAsNewAPIHadoopDataset(job.getConfiguration)
    sc.stop()
  }
}

这个方法和第一种几乎一样,大家可以根据自己的情况选择使用其中一个。不过上面将Spark中的数据写入到Hbase还是有点啰嗦,后面我将单独再介绍如何将RDD中的数据直接写入到hbase中,类似于saveToHbase,欢迎大家关注。


本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
spark操作HBASE
spark读写hbase性能对比
Spark操作Hbase
Hortonworks的开源框架SHC的使用(一)
Spark SQL 在字节跳动的优化实践
在HBase里使用MapReduce例子
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服