打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Spark 2.0系列之SparkSession详解

原文链接:How to use SparkSession in Apache Spark 2.0
作者:Jules Damji
译者:刘旭坤
责编:郭芮,关注大数据领域,寻求报道或投稿请发邮件guorui@csdn.net。另有CSDN Spark用户微信群,请添加微信guorui_1118并备注公司+实名+职位申请入群。

Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,用户不但可以使用DataFrame和Dataset的各种API,学习Spark的难度也会大大降低。

本文就SparkSession在Spark2.0中的功能和地位加以阐释。

SparkSession的功能

首先,我们从一个Spark应用案例入手:SparkSessionZipsExample可以从JSON文件中读取邮政编码,通过DataFrame API进行分析,同时还能够使用Spark SQL语句实施查询。

  • 创建SparkSession

在2.0版本之前,使用Spark必须先创建SparkConf和SparkContext,代码如下:

//set up the spark configuration and create contextsval sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")// your handle to SparkContext to access other context like SQLContextval sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")val sqlContext = new org.apache.spark.sql.SQLContext(sc)

不过在Spark2.0中只要创建一个SparkSession就够了,SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中。下面的代码创建了一个SparkSession对象并设置了一些参数。这里使用了生成器模式,只有此“spark”对象不存在时才会创建一个新对象。

// Create a SparkSession. No need to create SparkContext// You automatically get it as part of the SparkSessionval warehouseLocation = "file:${system:user.dir}/spark-warehouse"val spark = SparkSession   .builder()   .appName("SparkSessionZipsExample")   .config("spark.sql.warehouse.dir", warehouseLocation)   .enableHiveSupport()   .getOrCreate()

执行完上面的代码就可以使用spark对象了。

  • 设置运行参数

创建SparkSession之后可以设置运行参数,代码如下:

//set new runtime optionsspark.conf.set("spark.sql.shuffle.partitions", 6)spark.conf.set("spark.executor.memory", "2g")//get all settingsval configMap:Map[String, String] = spark.conf.getAll()

也可以使用Scala的迭代器来读取configMap中的数据。

  • 读取元数据

如果需要读取元数据(catalog),可以通过SparkSession来获取。

//fetch metadata data from the catalogspark.catalog.listDatabases.show(false)spark.catalog.listTables.show(false)

这里返回的都是Dataset,所以可以根据需要再使用Dataset API来读取。

  • 创建Dataset和Dataframe

通过SparkSession来创建Dataset和Dataframe有多种方法。其中最简单的就是使用spark.range方法来生成Dataset,在摸索Dataset API的时候这个办法尤其有用。

//create a Dataset using spark.range starting from 5 to 100, with increments of 5val numDS = spark.range(5, 100, 5)// reverse the order and display first 5 itemsnumDS.orderBy(desc("id")).show(5)//compute descriptive stats and display themnumDs.describe().show()// create a DataFrame using spark.createDataFrame from a List or Seqval langPercentDF = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20)))//rename the columnsval lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")//order the DataFrame in descending order of percentagelpDF.orderBy(desc("percent")).show(false)

  • 读取JSON数据

此外,还可以用SparkSession读取JSON、CSV、TXT和parquet表。下面的代码中读取了一个JSON文件,返回的是一个DataFrame。

// read the json file and create the dataframeval jsonFile = args(0)val zipsDF = spark.read.json(jsonFile)//filter all cities whose population > 40KzipsDF.filter(zipsDF.col("pop") > 40000).show(10)
  • 使用SparkSQL

借助SparkSession用户可以像SQLContext一样使用Spark SQL的全部功能。下面的代码中先创建了一个表然后对此表进行查询。

// Now create an SQL table and issue SQL queries against it without// using the sqlContext but through the SparkSession object.// Creates a temporary view of the DataFramezipsDF.createOrReplaceTempView("zips_table")zipsDF.cache()val resultsDF = spark.sql("SELECT city, pop, state, zip FROM zips_table")resultsDF.show(10)

  • 存储/读取Hive表

下面的代码演示了通过SparkSession来创建Hive表并进行查询的方法。

//drop the table if exists to get around existing table errorspark.sql("DROP TABLE IF EXISTS zips_hive_table")//save as a hive tablespark.table("zips_table").write.saveAsTable("zips_hive_table")//make a similar query against the hive table val resultsHiveDF = spark.sql("SELECT city, pop, state, zip FROM zips_hive_table WHERE pop > 40000")resultsHiveDF.show(10)

这里可以看到从DataFrame API、Spark SQL和Hive语句返回的结果是完全相同的。

关于这个例子的完整代码可以到这里获取。

Spark REPL和Databricks Notebook中的SparkSession对象

在之前的Spark版本中,Spark shell会自动创建一个SparkContext对象sc。2.0中Spark shell则会自动创建一个SparkSession对象(spark),在输入spark时就会发现它已经存在了。

在Databricks notebook中创建集群时也会自动生成一个SparkSession,这里用的名字也是spark。

SparkSession和SparkContext

下图说明了SparkContext在Spark中的主要功能。

从图中可以看到SparkContext起到的是一个中介的作用,通过它来使用Spark其他的功能。每一个JVM都有一个对应的SparkContext,driver program通过SparkContext连接到集群管理器来实现对集群中任务的控制。Spark配置参数的设置以及对SQLContext、HiveContext和StreamingContext的控制也要通过SparkContext进行。

不过在Spark2.0中上述的一切功能都是通过SparkSession来完成的,同时SparkSession也简化了DataFrame/Dataset API的使用和对数据的操作。

小结

本文通过一个简单的例子演示了SparkSession的使用方法并与Spark2.0之前版本中的SparkContext进行了对比。总而言之SparkSession是既容易学又方便用。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
Spark计算引擎之SparkSQL详解
大数据开发技术之Spark SQL的多种使用方法
理解Spark SQL(三)—— Spark SQL程序举例
【Spark 2.0系列】: Spark Session API和Dataset API
Spark学习实例(Python):加载数据源Load Data Source
SparkSQL /DataFrame /Spark RDD谁快?
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服