打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
第6课:Spark Streaming源码解读之Job动态生成和深度思考

一:Spark Streaming Job生成深度思考
1. 做大数据例如Hadoop,Spark等,如果不是流处理的话,一般会有定时任务。例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理。
2. JobGenerator构造的时候有一个核心的参数是jobScheduler, jobScheduler是整个作业的生成和提交给集群的核心,JobGenerator会基于DStream生成Job。这里面的Job就相当于Java中线程要处理的Runnable里面的业务逻辑封装。Spark的Job就是运行的一个作业。
3. Spark Streaming除了基于定时操作以外参数Job,还可以通过各种聚合操作,或者基于状态的操作。
4. 每5秒钟JobGenerator都会产生Job,此时的Job是逻辑级别的,也就是说有这个Job,并且说这个Job具体该怎么去做,此时并没有执行。具体执行的话是交给底层的RDD的action去触发,此时的action也是逻辑级别的。底层物理级别的,Spark Streaming他是基于DStream构建的依赖关系导致的Job是逻辑级别的,底层是基于RDD的逻辑级别的。

val ssc = new StreamingContext(conf, Seconds(5))
  • 1
 5. Spark Streaming的触发器是以时间为单位的,storm是以事件为触发器,也就是基于一个又一个record. Spark Streaming基于时间,这个时间是Batch Duractions

从逻辑级别翻译成物理级别,最后一个操作肯定是RDD的action,但是并不想一翻译立马就触发job。这个时候怎么办?
6. action触发作业,这个时候作为Runnable接口封装,他会定义一个方法,这个方法里面是基于DStream的依赖关系生成的RDD。翻译的时候是将DStream的依赖关系翻译成RDD的依赖关系,由于DStream的依赖关系最后一个是action级别的,翻译成RDD的时候,RDD的最后一个操作也应该是action级别的,如果翻译的时候直接执行的话,就直接生成了Job,就没有所谓的队列,所以会将翻译的事件放到一个函数中或者一个方法中,因此,如果这个函数没有指定的action触发作业是执行不了的。
7. Spark Streaming根据时间不断的去管理我们的生成的作业,所以这个时候我们每个作业又有action级别的操作,这个action操作是对DStream进行逻辑级别的操作,他生成每个Job放到队列的时候,他一定会被翻译为RDD的操作,那基于RDD操作的最后一个一定是action级别的,如果翻译的话直接就是触发action的话整个Spark Streaming的Job就不受管理了。因此我们既要保证他的翻译,又要保证对他的管理,把DStream之间的依赖关系转变为RDD之间的依赖关系,最后一个DStream使得action的操作,翻译成一个RDD之间的action操作,整个翻译后的内容他是一块内容,他这一块内容是放在一个函数体中的,这个函数体,他会函数的定义,这个函数由于他只是定义还没有执行,所以他里面的RDD的action不会执行,不会触发Job,当我们的JobScheduler要调度Job的时候,转过来在线程池中拿出一条线程执行刚才的封装的方法。

二:Spark Streaming Job生成源码解析
Spark 作业动态生成三大核心:
JobGenerator: 负责Job生成。
JobSheduler: 负责Job调度。
ReceiverTracker: 获取元数据。
1. JobScheduler的start方法被调用的时候,会启动JobGenerator的start方法。

/** Start generation of jobs */def start(): Unit = synchronized {//eventLoop是消息循环体,因为不断的生成Job  if (eventLoop != null) return // generator has already been started  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.  // See SPARK-10125  checkpointWriter//匿名内部类  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)    override protected def onError(e: Throwable): Unit = {      jobScheduler.reportError("Error in job generator", e)    }  }//调用start方法。  eventLoop.start()  if (ssc.isCheckpointPresent) {    restart()  } else {    startFirstTime()  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

EvenLoop: 的start方法被调用,首先会调用onstart方法。然后就启动线程。

/** * An event loop to receive events from the caller and process all events in the event thread. It * will start an exclusive event thread to process all events. * * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can * handle events in time to avoid the potential OOM. */private[spark] abstract class EventLoop[E](name: String) extends Logging {  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  private val stopped = new AtomicBoolean(false)//开启后台线程。     private val eventThread = new Thread(name) {    setDaemon(true)    override def run(): Unit = {      try {//不断的从BlockQueue中拿消息。        while (!stopped.get) {//线程的start方法调用就会不断的循环队列,而我们将消息放到eventQueue中。          val event = eventQueue.take()          try {//            onReceive(event)          } catch {            case NonFatal(e) => {              try {                onError(e)              } catch {                case NonFatal(e) => logError("Unexpected error in " + name, e)              }            }          }        }      } catch {        case ie: InterruptedException => // exit even if eventQueue is not empty        case NonFatal(e) => logError("Unexpected error in " + name, e)      }    }  }  def start(): Unit = {    if (stopped.get) {      throw new IllegalStateException(name + " has already been stopped")    }    // Call onStart before starting the event thread to make sure it happens before onReceive    onStart()    eventThread.start()  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

onReceive:不断的从消息队列中获得消息,一旦获得消息就会处理。
不要在onReceive中添加阻塞的消息,如果这样的话会不断的阻塞消息。
消息循环器一般都不会处理具体的业务逻辑,一般消息循环器发现消息以后都会将消息路由给其他的线程去处理。

/** * Invoked in the event thread when polling events from the event queue. * * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked * and cannot process events in time. If you want to call some blocking actions, run them in * another thread. */protected def onReceive(event: E): Unit
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

消息队列接收到事件后具体处理如下:

/** Processes all events */private def processEvent(event: JobGeneratorEvent) {  logDebug("Got event " + event)  event match {    case GenerateJobs(time) => generateJobs(time)    case ClearMetadata(time) => clearMetadata(time)    case DoCheckpoint(time, clearCheckpointDataLater) =>      doCheckpoint(time, clearCheckpointDataLater)    case ClearCheckpointData(time) => clearCheckpointData(time)  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

基于Batch Duractions生成Job,并完成checkpoint.
Job生成的5个步骤。

/** Generate jobs and perform checkpoint for the given `time`.  */private def generateJobs(time: Time) {  // Set the SparkEnv in this thread, so that job generation code can access the environment  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.  SparkEnv.set(ssc.env)  Try {//第一步:获取当前时间段里面的数据。根据分配的时间来分配具体要处理的数据。    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch//第二步:生成Job,获取RDD的DAG依赖关系。在此基于DStream生成了RDD实例。    graph.generateJobs(time) // generate jobs using allocated block  } match {    case Success(jobs) =>//第三步:获取streamIdToInputInfos的信息。BacthDuractions要处理的数据,以及我们要处理的业务逻辑。      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)//第四步:将生成的Job交给jobScheduler      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))    case Failure(e) =>      jobScheduler.reportError("Error generating jobs for time " + time, e)  }//第五步:进行checkpoint  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

此时的outputStream是整个DStream中的最后一个DStream,也就是foreachDStream.

def generateJobs(time: Time): Seq[Job] = {  logDebug("Generating jobs for time " + time)  val jobs = this.synchronized {    outputStreams.flatMap { outputStream =>//根据最后一个DStream,然后根据时间生成Job.      val jobOption = outputStream.generateJob(time)      jobOption.foreach(_.setCallSite(outputStream.creationSite))      jobOption    }  }  logDebug("Generated " + jobs.length + " jobs for time " + time)  jobs}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

此时的JobFunc就是我们前面提到的用函数封装了Job。
generateJob基于给定的时间生成Spark Streaming 的Job,这个方法会基于我们的DStream的操作物化成了RDD,由此可以看出,DStream是逻辑级别的,RDD是物理级别的。

/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */private[streaming] def generateJob(time: Time): Option[Job] = {  getOrCompute(time) match {    case Some(rdd) => {      val jobFunc = () => {        val emptyFunc = { (iterator: Iterator[T]) => {} }//rdd => 就是RDD的依赖关系        context.sparkContext.runJob(rdd, emptyFunc)      }//此时的      Some(new Job(time, jobFunc))    }    case None => None  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Job这个类就代表了Spark业务逻辑,可能包含很多Spark Jobs.

/** * Class representing a Spark computation. It may contain multiple Spark jobs. */private[streaming]class Job(val time: Time, func: () => _) {  private var _id: String = _  private var _outputOpId: Int = _  private var isSet = false  private var _result: Try[_] = null  private var _callSite: CallSite = null  private var _startTime: Option[Long] = None  private var _endTime: Option[Long] = None  def run() {//调用func函数,此时这个func就是我们前面generateJob中的func    _result = Try(func())  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

此时put函数中的RDD是最后一个RDD,虽然触发Job是基于时间,但是也是基于DStream的action的。

/** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {  // If RDD was already generated, then retrieve it from HashMap,  // or else compute the RDD//基于时间生成RDD  generatedRDDs.get(time).orElse {    // Compute the RDD if time is valid (e.g. correct time in a sliding window)    // of RDD generation, else generate nothing.    if (isTimeValid(time)) {      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {        // Disable checks for existing output directories in jobs launched by the streaming        // scheduler, since we may need to write output to an existing directory during checkpoint        // recovery; see SPARK-4835 for more details. We need to have this call here because        // compute() might cause Spark jobs to be launched.        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {//          compute(time)        }      }//然后对generated RDD进行checkpoint      rddOption.foreach { case newRDD =>        // Register the generated RDD for caching and checkpointing        if (storageLevel != StorageLevel.NONE) {          newRDD.persist(storageLevel)          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")        }        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {          newRDD.checkpoint()          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")        }//以时间为Key,RDD为Value,此时的RDD为最后一个RDD        generatedRDDs.put(time, newRDD)      }      rddOption    } else {      None    }  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

回到JobGenerator中的start方法。

  if (ssc.isCheckpointPresent) {//如果不是第一次启动的话,就需要从checkpoint中恢复。    restart()  } else {//否则的话,就是第一次启动。    startFirstTime()  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

StartFirstTime的源码如下:

/** Starts the generator for the first time */private def startFirstTime() {  val startTime = new Time(timer.getStartTime())//告诉DStreamGraph第一个Batch启动时间。  graph.start(startTime - graph.batchDuration)//timer启动,整个job不断生成就开始了。  timer.start(startTime.milliseconds)  logInfo("Started JobGenerator at " + startTime)}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

这里的timer是RecurringTimer。RecurringTimer的start方法会启动内置线程thread.

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
  • 1
  • 2
  • 3

Timer.start源码如下:

/** * Start at the given start time. */def start(startTime: Long): Long = synchronized {  nextTime = startTime //每次调用的  thread.start()  logInfo("Started timer for " + name + " at time " + nextTime)  nextTime}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

调用thread启动后台进程。

private val thread = new Thread("RecurringTimer - " + name) {  setDaemon(true)  override def run() { loop }}
  • 1
  • 2
  • 3
  • 4
  • 5

loop源码如下:

  /**   * Repeatedly call the callback every interval.   */  private def loop() {    try {      while (!stopped) {        triggerActionForNextInterval()      }      triggerActionForNextInterval()    } catch {      case e: InterruptedException =>    }  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

tiggerActionForNextInterval源码如下:

private def triggerActionForNextInterval(): Unit = {  clock.waitTillTime(nextTime)  callback(nextTime)  prevTime = nextTime  += period  logDebug("Callback for " + name + " called at time " + prevTime)}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

此时的callBack是RecurringTimer传入的。下面就去找callBack是谁传入的,这个时候就应该找RecurringTimer什么时候实例化的。

private[streaming]class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)  extends Logging {  private val thread = new Thread("RecurringTimer - " + name) {    setDaemon(true)    override def run() { loop }  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在jobGenerator中,匿名函数会随着时间不断的推移反复被调用。

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,//匿名函数,复制给callback。  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
  • 1
  • 2
  • 3
  • 4

而此时的eventLoop就是JobGenerator的start方法中eventLoop.eventLoop是一个消息循环体当收到generateJobs,就会将消息放到线程池中去执行。
至此,就知道了基于时间怎么生成作业的流程就贯通了。

Jobs: 此时的jobs就是jobs的业务逻辑,就类似于RDD之间的依赖关系,保存最后一个job,然后根据依赖关系进行回溯。
streamIdToInputInfos:基于Batch Duractions以及要处理的业务逻辑,然后就生成了JobSet.

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
  • 1

此时的JobSet就包含了数据以及对数据处理的业务逻辑。

/** Class representing a set of Jobs  * belong to the same batch.  */private[streaming]case class JobSet(    time: Time,    jobs: Seq[Job],    streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {  private val incompleteJobs = new HashSet[Job]()  private val submissionTime = System.currentTimeMillis() // when this jobset was submitted  private var processingStartTime = -1L // when the first job of this jobset started processing  private var processingEndTime = -1L // when the last job of this jobset finished processing  jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }  incompleteJobs ++= jobs  def handleJobStart(job: Job) {    if (processingStartTime < 0) processingStartTime = System.currentTimeMillis()  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

submitJobSet:

def submitJobSet(jobSet: JobSet) {  if (jobSet.jobs.isEmpty) {    logInfo("No jobs added for time " + jobSet.time)  } else {    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))//    jobSets.put(jobSet.time, jobSet)//jobHandler    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))    logInfo("Added jobs for time " + jobSet.time)  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

JobHandle是一个Runnable接口,Job就是我们业务逻辑,代表的就是一系列RDD的依赖关系,job.run方法就导致了func函数的调用。

  private class JobHandler(job: Job) extends Runnable with Logging {    import JobScheduler._    def run() {      try {        val formattedTime = UIUtils.formatBatchTime(          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"        ssc.sc.setJobDescription(          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)        // We need to assign `eventLoop` to a temp variable. Otherwise, because        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then        // it's possible that when `post` is called, `eventLoop` happens to null.        var _eventLoop = eventLoop        if (_eventLoop != null) {          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))          // Disable checks for existing output directories in jobs launched by the streaming          // scheduler, since we may need to write output to an existing directory during checkpoint          // recovery; see SPARK-4835 for more details.          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {//            job.run()          }          _eventLoop = eventLoop          if (_eventLoop != null) {            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))          }        } else {          // JobScheduler has been stopped.        }      } finally {        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)      }    }  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

此时的func就是基于DStream的业务逻辑。也就是RDD之间依赖的业务逻辑。

def run() {  _result = Try(func())}
  • 1
  • 2
  • 3
  • 4

总体架构如下:

本课程笔记来源于:

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Apache Spark源码走读之4
Spark Streaming源码解读之Job详解
简单之美 | Kafka+Spark Streaming+Redis实时计算整合实践
Spark Streaming任务延迟监控及告警
spark流数据处理:Spark Streaming的使用
通过可视化来了解你的Spark应用程序
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服