打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
构建分区以在Apache Spark中处理数据文件

  

  > Image Source: Pexels

  SPARK分区指南

  继续前面有关确定关键转换中的分区数的故事,该故事将描述从Spark应用程序中的数据文件创建的分区数背后的原因。

  大多数Spark应用程序从一组数据文件(各种格式)中为其执行管道提供输入数据。 为了方便从文件中读取数据,Spark在原始RDD和数据集的上下文中提供了专用的API。 这些API将读取过程从数据文件抽象到具有确定数量的分区的输入RDD或数据集。 然后,用户可以对这些输入的RDD /数据集执行各种转换/操作。

  输入原始RDD或数据集中的每个分区都映射到一个或多个数据文件,该映射是在文件的一部分或整个文件上完成的。 在其管道中具有输入RDD / Dataset的Spark Job执行期间,通过根据分区到数据文件的映射读取数据来计算输入RDD / Dataset的每个分区。 然后将其馈送给相关的RDD / Dataset,进一步进入执行管道。

  基于多个参数来确定输入RDD /数据集(映射到数据文件)中的分区数,以实现最佳并行度。 这些参数带有默认值,也可以由用户调整。 在输入的RDD / Dataset中确定的分区数可能会影响Job的整个执行管道的效率。 因此,重要的是要知道在输入RDD或数据集的情况下如何根据某些参数确定分区数。

  用于读取数据文件的数据集API时的分区数:提供了多个API来将数据文件读取到数据集中,其中每个API都在SparkSession实例上调用,该实例从2.0版本开始构成Spark应用程序的统一入口点。 其中一些API如下所示:

  *File Format specific APIs*

  Dataset=SparkSession.read.csv(String path or List of paths)

  Dataset=SparkSession.read.json(String path or List of paths)

  Dataset=SparkSession.read.text(String path or List of paths)

  Dataset=SparkSession.read.parquet(String path or List of paths)

  Dataset=SparkSession.read.orc(String path or List of paths)*Generic API*

  Dataset=SparkSession.read.format(String fileformat).load(String path or List of paths)

  'path' in above APIs is either actual file path or directory path. Also, it could contain wildcard, such as '*'.

  There are more variants of these APIs which includes facility of specifying various options realted to a specific file reading. Full list can be referred here.

  在查看了用于读取数据文件的API之后,以下是配置参数列表,该列表会影响数据集中表示数据文件中数据的分区数:

  (a)spark.default.parallelism (default: Total No. of CPU cores)(b)spark.sql.files.maxPartitionBytes (default: 128 MB)

  (c)spark.sql.files.openCostInBytes (default: 4 MB)

  使用这些配置参数值,最大拆分准则被称为maxSplitBytes的计算如下:

  maxSplitBytes=Minimum(maxPartitionBytes, bytesPerCore)

  其中bytesPerCore的计算公式为:

  bytesPerCore=(Sum of sizes of all data files + No. of files * openCostInBytes) / default.parallelism

  现在使用" maxSplitBytes",每个(待读取)数据文件都可以拆分(如果可拆分的话)。 因此,如果文件是可拆分的,且大小大于" maxSplitBytes",则文件将拆分为" maxSplitBytes"的多个块,最后一个块小于或等于" maxSplitBytes"。 如果文件不可拆分,或者文件大小小于" maxSplitBytes",则只有一个文件大小等于文件大小。

  在为所有数据文件计算了文件块之后,将一个或多个文件块打包在一个分区中。 对于每个迭代的文件块,打包过程首先初始化一个空分区,然后对文件块进行迭代:

  · 如果没有当前分区要打包,请初始化要打包的新分区,然后将迭代的文件块分配给该分区。 分区大小成为块大小与" openCostInBytes"的额外开销的总和。

  · 如果添加的块大小不超过当前分区(正在打包)的大小超过" maxSplitBytes",则文件块将成为当前分区的一部分。 分区大小通过块大小和" openCostInBytes"的额外开销之和来增加。

  · 如果添加的块大小超过了当前分区的大小超过" maxSplitBytes",则将当前分区声明为完成,并启动一个新分区。 迭代的文件块将成为正在启动的较新分区的一部分,而较新的分区大小将成为块大小与" openCostInBytes"的额外开销的总和。

  打包过程结束后,将获得用于读取相应数据文件的数据集的分区数。

  

  > Illustration of the process of deriving the partitions for a set of data files, first the data fil

  尽管得出分区数量的过程似乎有些复杂,但是基本思想是,如果文件是可拆分的,则首先在maxSplitBytes的边界处拆分单个文件。 此后,将拆分后的文件块或不可拆分的文件打包到分区中,以便在将块打包到分区中时,如果分区大小超过maxSplitBytes,则认为该分区已完成打包,然后采用新分区进行打包。 因此,最终从包装过程中得出了一定数量的分区。

  为了说明起见,以下是一些使用数据集API得出分区数量的示例:

  (a)54个parquet文件,每个65 MB,默认情况下所有3个配置参数,核心数等于10:由此得出的分区数为54。此处每个文件只有一个块。 在此减肥药示例中,很明显,两个文件不能打包在一个分区中(因为大小会超过" maxSplitBytes",即添加第二个文件后为128 MB)。

  

  > No. of partitions calculated for case (a)

  (b)54个parquet文件,每个63 MB,默认情况下所有3个配置参数,核心数等于10:分区数再次变为54。似乎可以在此处打包两个文件,但是, 打包第一个文件后会有'openCostInBytes'(4 MB)的开销,因此,添加第二个文件后,越过了128 MB的限制,因此,在此示例中,两个文件不能打包在一个分区中。

  

  > No. of partitions calculated for case (b)

  (c) 54个parquet文件,每个40 MB,默认情况下所有3个配置参数,核心数等于10:这次分区数为18。 根据上面说明的打包过程,即使添加两个40 MB的文件和每个4 MB的开销后,总大小仍为88 MB,因此也可以打包40 MB的第三个文件,因为大小为 仅为128 MB。 因此,分区数为18。

  

  > No. of partitions calculated for case ?

  应当注意,在评估文件块的打包资格时,不考虑openCost的开销,仅在考虑将文件块打包在分区中之后才在增加分区大小的同时考虑开销。

  (d)54个实木复合地板文件,每个40 MB,maxPartitionBytes设置为88 MB,其他两个配置为默认值。核心数等于10:在这种情况下,分区数为27,而不是原来的18 ?。 这是由于" maxPartitionBytes"的值发生了变化。 如上所述,可以根据文件拆分和打包过程轻松推断出54个分区。

  (e)54个实木复合地板文件,每个40 MB,spark.default.parallelism设置为400,其他两个配置为默认值,核心数等于10:在这种情况下,分区数为378。 同样,如上所述,可以根据文件拆分和打包过程轻松推断378个分区。

  

  > No. of partitions calculated for case (e)

  使用RDD API读取数据文件时的分区数:

  提供以下API来将数据文件读取到RDD中,在SparkSession实例的SparkContex上分别调用这些API:

  *SparkContext.newAPIHadoopFile(String path, Class fClass, Class kClass, Class vClass, org.apache.hadoop.conf.Configuration conf)

  *SparkContext.textFile(String path, int minPartitions)

  *SparkContext.sequenceFile(String path, Class keyClass, Class valueClass)

  *SparkContext.sequenceFile(String path, Class keyClass, Class valueClass, int minPartitions)

  *SparkContext.objectFile(String path, int minPartitions, scala.reflect.ClassTag evidence$4)

  在其中一些API中,要求输入参数" minPartitions",而在其他API中则不需要。 如果未要求,默认值将为2或1,如果default.parallelism为1,则为1。此" minPartitions"是决定这些API返回的RDD中的分区数的因素之一。 其他因素是以下Hadoop配置参数的值:

  minSize (mapred.min.split.size - default value 1 MB)

  blockSize (dfs.blocksize - default 128 MB)

  基于这三个参数的值,称为分割大小的分割准则计算为:

  splitSize=Math.max(minSize, Math.min(goalSize, blockSize));

  这里:

  goalSize=Sum of all files lengths to be read / minPartitions

  现在使用" splitSize",如果每个数据文件都是可拆分的,则将拆分每个(待读取)数据文件。 因此,如果文件可拆分的大小大于" splitSize",则文件将拆分为多个" splitSize"块,最后一个块小于或等于" splitSize"。 如果文件不可拆分,或者文件大小小于" splitSize",则只有一个文件大小等于文件长度。

  每个文件块(大小大于零)都映射到单个分区。 因此,RDD API在数据文件上返回的RDD中的分区数等于使用" splitSize"对数据文件进行切片后得出的非零文件块数。

  

  > Illustration of the process of deriving the partitions for a set of data files, first the data fil

  为了说明起见,以下是一些使用数据集API得出分区数量的示例:

  (a)31个Parquet文件,每个330 MB,默认情况下为块大小128 MB,未指定minPartitions,默认情况下为'mapred.min.split.size',核心数等于10:由此得出的分区数为 是93。splitSize仅来自128 MB,因此基本上分区的数量等于31个文件占用的块数。 每个文件占用3个块,因此总块数和总分区数为93。

  

  

  > No. of partitions calculated for case (a)

  (b)54个parquet文件,每个40 MB,默认情况下为块大小128 MB,未指定minPartitions,默认情况下为'mapred.min.split.size',核心数等于10:由此得出的分区数为 是54。splitSize仅来自128 MB,因此基本上分区的数量等于54个文件所占用的块数。 每个文件占用1个块,因此块总数和分区总数为54。

  

  

  > No. of partitions calculated for case (b)

  (c)31个parquet文件,每个330 MB,默认情况下为块大小128 MB,minPartitions指定为1000,默认情况下为" mapred.min.split.size",核心数等于10:用于此目的的分区数 为1023。splitSize仅来自10.23 MB,因此每个文件的文件分割数等于33,文件分割总数为1023,因此分区总数也为1023。

  

  

  > No. of partitions calculated for case (c)

  (d)31个parquet文件,每个330 MB,默认情况下块大小为128 MB,未指定minPartitions," mapred.min.split.size"设置为256 MB,核心数等于10:用于此操作的分区数 的大小为62。splitSize仅来自256 MB,因此每个文件的文件分割数等于2,文件分割总数为62,因此分区的总数也为62。

  

  > No. of partitions calculated for case (d)

  从" splitSize"计算中可以明显看出,如果希望分区大小大于块大小,则需要将" mapred.min.split.size"设置为大于块大小的更大数字。 另外,如果希望分区大小小于块大小,则应将" minPartitions"设置为相对较高的值,以使目标大小(文件大小总和/ minParitions)的计算结果小于块大小。

  简介:直到最近,对一组数据文件选取一定数量的分区的过程对我而言一直都是神秘的。 但是,最近,在优化例程期间,我想更改Spark选择的用于处理一组数据文件的默认分区数,也就是在我开始全面验证该过程以及证明的时候。 希望,此解码过程的描述也将有助于读者更深入地了解Spark,并使他们能够设计高效且优化的Spark例程。

  请记住,分区的最佳数量是高效可靠的Spark应用程序的关键。 如果对这个故事有反馈或疑问,请在评论部分中写。 希望您会发现它有用。 这是有关Apache Spark的其他综合故事的链接。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
怎么安装Win7系统?教你安装windows 7系统【图文教程】
乐易佳数据恢复专业版–爆破补丁 | 野人博客
这可能是最小的数据恢复软件,仅有3MB大小,但功能非常强大
数据恢复~永久本地付费购买!不更新,不失效!仅1.62MB!
SparkR:数据科学家的新利器
Apache Spark 3.0 预览版正式发布,多项重大功能发布
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服