打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Spark Checkpoint读操作代码分析

Spark Checkpoint读操作代码分析

  上次介绍了RDD的Checkpint写过程(《Spark Checkpoint写操作代码分析》),本文将介绍RDD如何读取已经Checkpint的数据。在RDD Checkpint完之后,Checkpint的信息(比如数据存放的目录)都由RDDCheckpointData去管理,所以当下次计算依赖了这个RDD的时候,首先是根据依赖关系判断出当前这个RDD是否被Checkpint了,主要是通过RDD的dependencies决定:

final def dependencies: Seq[Dependency[_]] = {
  checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
    if (dependencies_ == null) {
      dependencies_ = getDependencies
    }
    dependencies_
  }
}

  如果RDD被Checkpint了,那么checkpointRDD为Some(CheckpointRDD[T])了,所以依赖的RDD变成了CheckpointRDD。在计算数据的过程中会调用RDD的iterator方法:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    <span class="wp_keywordlink_affiliate"><a data-original-title="View all posts in Spark" href="https://www.iteblog.com/archives/tag/spark/" title="" target="_blank">Spark</a></span>Env.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}
 
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
   if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}

  计算的过程中首先会判断RDD是否被Checkpint了,而RDD Checkpint写之后这个条件肯定是true的。而firstParent已经变成了CheckpointRDD,所以会调用CheckpointRDD的iterator方法, 该方法最终会调用ReliableCheckpointRDD的compute方法:

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
  val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
  ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}

  在compute方法中会通过ReliableCheckpointRDD的readCheckpointFile方法来从file路径里面读出已经Checkpint的数据,readCheckpointFile的实现如下:

def readCheckpointFile[T](
    path: Path,
    broadcastedConf: Broadcast[SerializableConfiguration],
    context: TaskContext): Iterator[T] = {
  val env = <span class="wp_keywordlink_affiliate"><a data-original-title="View all posts in Spark" href="https://www.iteblog.com/archives/tag/spark/" title="" target="_blank">Spark</a></span>Env.get
  val fs = path.getFileSystem(broadcastedConf.value.value)
  val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
  val fileInputStream = fs.open(path, bufferSize)
  val serializer = env.serializer.newInstance()
  val deserializeStream = serializer.deserializeStream(fileInputStream)
 
  // Register an on-task-completion callback to close the input stream.
  context.addTaskCompletionListener(context => deserializeStream.close())
 
  deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}

最后数据就回被全部读取出来,整个Checkpint读过程完成了。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Spark的RDD检查点实现分析
【转】Spark源码分析之
Spark通过JdbcRdd连接Oracle数据库(scala)
大数据IMF传奇行动绝密课程第16课:RDD实战
spark利用sparkSQL将数据写入hive两种通用方式实现及比较
一次实践:spark查询hive速度缓慢原因分析并以此看到spark基础架构
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服