在《Spark读取Hbase中的数据》文章中我介绍了如何在Spark中读取Hbase中的数据,并提供了Java和Scala两个版本的实现,本文将接着上文介绍如何通过Spark将计算好的数据存储到Hbase中。
Spark中内置提供了两个方法可以将数据写入到Hbase:(1)、saveAsHadoopDataset;(2)、saveAsNewAPIHadoopDataset,它们的官方介绍分别如下:
saveAsHadoopDataset
: Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system. The JobConf should set an OutputFormat and any output paths required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.
saveAsNewAPIHadoopDataset
: Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop Configuration object for that storage system. The Conf should set an OutputFormat and any output paths required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.
可以看出这两个API分别是针对mapred
和mapreduce
实现的,本文将提供这两个版本的实现实例代码。在编写代码之前我们先在pom.xml文件中引入一下的依赖:
< dependency > < groupId >org.apache.spark</ groupId > < artifactId >spark-core_2.10</ artifactId > < version >0.9.1</ version > </ dependency > < dependency > < groupId >org.apache.hbase</ groupId > < artifactId >hbase</ artifactId > < version >0.98.2-hadoop2</ version > </ dependency > < dependency > < groupId >org.apache.hbase</ groupId > < artifactId >hbase-client</ artifactId > < version >0.98.2-hadoop2</ version > </ dependency > < dependency > < groupId >org.apache.hbase</ groupId > < artifactId >hbase-common</ artifactId > < version >0.98.2-hadoop2</ version > </ dependency > < dependency > < groupId >org.apache.hbase</ groupId > < artifactId >hbase-server</ artifactId > < version >0.98.2-hadoop2</ version > </ dependency > |
package com.iteblog.bigdata.hbase import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} ///////////////////////////////////////////////////////////////////// User : 过往记忆 Date : 2016 - 11 - 29 Time : 22 : 59 bolg : https : //www.iteblog.com 本文地址:https : //www.iteblog.com/archives/1892 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 过往记忆博客微信公共账号:iteblog _ hadoop ///////////////////////////////////////////////////////////////////// object SparkToHBase { def main(args : Array[String]) { if (args.length < 1 ) { System.err.println( "Usage: SparkToHBase <input file>" ) System.exit( 1 ) } val conf = new SparkConf().setAppName( "SparkToHBase" ) val sc = new SparkContext(conf) val input = sc.textFile(args( 0 )) //创建HBase配置 val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER _ QUORUM, "www.iteblog.com:2181" ) //创建JobConf,设置输出格式和表名 val jobConf = new JobConf(hConf, this .getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT _ TABLE, "iteblog" ) val data = input.map { item = > val Array(key, value) = item.split( "\t" ) val rowKey = key.reverse val put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes( "f1" ), Bytes.toBytes( "info" ), Bytes.toBytes(value)) ( new ImmutableBytesWritable, put) } //保存到HBase表 data.saveAsHadoopDataset(jobConf) sc.stop() } } |
我们输入的数据格式是:
0015 A 49 A 8 F 2 A 60 DACEE 0160545 C 58 F 94 1234 0152 C 9666 B 5 F 3426 DDDB 5 FB 74 BDBCE 4 F 4366 0160 D 90 AC 268 AEB 595208 E 8448 C 7 F 8 B 8 6577 0225 A 39 EB 29 BDB 582 CA 58 BE 86791 ACBC 1234 02462 ACDF 7232 C 49890 B 07 D 63 B 50 C 5 E 1 4366 030730 EBE 05740 C 992840525 E 35 BC 8 AD 7577 038 A 459 BC 05 F 3 B 655 F 5655 C 810 E 76352 7577 0417 D 3 FD 71458 C 4 BAD 1 E 5 AFDE 7259930 7577 042 CD 42 B 657 C 46 D 0 D 4 E 5 CC 69 AFDD 7 E 54 7577 051069378849 ACF 97 BFAD 09 D 3 A 9 C 7702 7577 05 E 833 C 9 C 763 A 98323 E 0328 DA 0 A 31039 7577 060 E 206514 A 24 D 944305 D 370 F 615 F 8 E 9 7577 087 E 8488796 C 29 E 1 C 8239565666 CE 2 D 7 7577 09 A 425 F 1 DD 240 A 7150 ECEFAA 0 BFF 25 FA 7577 0 B 27 E 3 CB 5 F 3 F 32 EB 3715 DB 8 E 2 D 333 BED 7577 0 B 27 E 82 A 4 CEE 73 BBB 98438 DFB 0 DB 2 FFE 7577 0 BAEEB 7 A 12 DCEF 20 EE 26 D 7 A 030164 DFF 7577 0 C 5 BFC 45 F 64907 A 61 ECB 1 C 892 F 98525 C 7577 0 C 74 F 2 FFD 1 BB 3598 BC 8 DB 10 C 37 DBA 6 B 4 7577 0 C 9 CEE 40 DDD 961 C 7 D 2 BBE 0491 FDF 92 A 8 7577 0 CC 578371622 F 932287 EB 81065 F 81 F 5 F 7577 0 D 6 B 03 EFDAE 7165 A 0 F 7 CC 79 EABEAC 0 D 3 7577 0 DF 7 B 014187 A 9 AB 2 F 1049781592 CC 053 7577 0 E 67 D 8 ABDB 3749 D 58207 A 7 B 45 FEA 7 F 12 7577 0 E 866677 E 79 A 7843 E 0 EDCF 2 BE 0141911 7577 0 EAF 4 A 69 BA 3 BF 05 E 8 EA 75 CC 1287304 A 3 7577 0 EE 2969 AE 674 DF 5 F 8944 B 5 EA 2 E 97 DBEC 7577 0 FAA 253 D 53 BC 6 D 831 CF 6 E 742147 C 3 BED 7577 0 FB 92 AC 3 DE 664 BFF 40 D 334 DA 8 EE 97 B 85 7577 |
第一列将作为HBase的Rowkey存储,第二列就是info的值。
package com.iteblog.bigdata.hbase import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkContext, SparkConf} ///////////////////////////////////////////////////////////////////// User : 过往记忆 Date : 2016 - 11 - 29 Time : 22 : 59 bolg : https : //www.iteblog.com 本文地址:https : //www.iteblog.com/archives/1892 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 过往记忆博客微信公共账号:iteblog _ hadoop ///////////////////////////////////////////////////////////////////// object SparkToHBaseNew { def main(args : Array[String]) { if (args.length < 1 ) { System.err.println( "Usage: SparkToHBaseNew <input file>" ) System.exit( 1 ) } val conf = new SparkConf().setAppName( "SparkToHBaseNew" ) val sc = new SparkContext(conf) val input = sc.textFile(args( 0 )) val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER _ QUORUM, "www.iteblog.com:2181" ) val jobConf = new JobConf(hConf, this .getClass) jobConf.set(TableOutputFormat.OUTPUT _ TABLE, "iteblog" ) //设置job的输出格式 val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val data = input.map { item = > val Array(key, value) = item.split( "\t" ) val rowKey = key.reverse val put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes( "f1" ), Bytes.toBytes( "info" ), Bytes.toBytes(value)) ( new ImmutableBytesWritable, put) } //保存到HBase表 data.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } } |
这个方法和第一种几乎一样,大家可以根据自己的情况选择使用其中一个。不过上面将Spark中的数据写入到Hbase还是有点啰嗦,后面我将单独再介绍如何将RDD中的数据直接写入到hbase中,类似于saveToHbase
,欢迎大家关注。
联系客服