【斯Parker】DAGScheduler源码浅析

作业与复制

近些年参加了一个数额分布化相关的门类,涉及到数据库 MySQL 的数目分布化。
简单易行来说就是亟需在他乡数据基本达成多点可写并确保分布后的数据能达标最后一致性。
开头对 MySQL
作数据分布仅仅是读写分离,通过数据库自个儿的主从复制即可已毕写主库、读从库。
今昔则必要双写主库并在经历一个短命的延时后达到最终一致性,这一个标题乍一想相比复杂,但终究依旧多少最后一致性的题材。

先回到最简便的图景,唯有一个 MySQL 数据库时,数据一致性是怎么确保的?
打探数据库的都驾驭,那是透过数据库的事情本性来担保的,事务包含四大特色:

  • Atomicity 原子性
  • Consistency 一致性
  • Isolation 隔离性
  • Durability 持久性

业务的 ACID
四大特征不是本文重点,就不举办做学术性讲演了,不打听的可以在前面参考文献里[3]去看有关作品。
那里只想提一个难题,单一数据库事务能保障数据的一致性,那么 MySQL
在安插成主从架构时,怎么着保管中央之间数据的一致性的?

MySQL 为了提供主从复制效用引入了一个新的日记文件叫
binlog,它富含了吸引多少变动的风云日志集合。
从库请求主库发送 binlog
并透过日记事件平复数据写入从库,所以从库的数目来源为 binlog。
如此那般 MySQL 主库只需已毕 binlog
与当地数据一致就可以保险主从库数据一致(暂且忽略互联网传输引发的主从分化)。
大家了然担保本地数据一致性是靠数据库事务天性来落成的,而数据库事务是如何落成的吗?先看上边那张图:

图片 1

MySQL
本人不提供工作支持,而是开放了蕴藏引擎接口,由具体的存储引擎来贯彻,具体来说援救MySQL 事务的蕴藏引擎就是 InnoDB。
积存引擎完结业务的通用形式是依照 redo log 和 undo log。
简单来讲来说,redo log 记录事务修改后的数据, undo log
记录事务前的本来数据。
所以当一个政工执行时实际暴发经过简化描述如下:

  1. 先记下 undo/redo log,确保日志刷到磁盘上百折不回存储。
  2. 履新数据记录,缓存操作并异步刷盘。
  3. 交付业务,在 redo log 中写入 commit 记录。

在 MySQL 执行工作进度中如若因故障中断,可以透过 redo log
来重做作业或透过 undo log 来回滚,确保了数额的一致性。
那一个都以由事务性存储引擎来形成的,但 binlog
不在事务存储引擎范围内,而是由 MySQL Server 来记录的。
那就是说就务须保险 binlog 数据和 redo log 之间的一致性,所以开启了 binlog
后其实的工作执行就多了一步,如下:

  1. 先记下 undo/redo log,确保日志刷到磁盘上百折不回存储。
  2. 更新数据记录,缓存操作并异步刷盘。
  3. 将事情日志持久化到 binlog。
  4. 交由业务,在 redo log 中写入提交记录。

那样的话,只要 binlog 没写成功,整个事情是亟需回滚的,而 binlog
写成功后就算 MySQL Crash 了都得以回复工作并形成提交。
要成功那一点,就需求把 binlog 和事务涉及起来,而唯有确保了 binlog
和事情数据的一致性,才能确保大旨数据的一致性。
于是 binlog
的写入进程不得不嵌入到纯粹的事体存储引擎执行进度中,并以内部分布式事务(xa
事务)的方式成就两阶段提交。
尤其的细节就不举办了,可以参考前面参考文献[5]。

学业提交与DAGScheduler操作

Action的多数操作会举办作业(job)的交付,源码1.0版的job提交进度的差不多调用链是:sc.runJob()–>dagScheduler.runJob–>dagScheduler.submitJob—>dagSchedulerEventProcessActor.JobSubmitted–>dagScheduler.handleJobSubmitted–>dagScheduler.submitStage–>dagScheduler.submitMissingTasks–>taskScheduler.submitTasks
切切实实的作业提交执行期的函数调用为:

  1. sc.runJob->dagScheduler.runJob->submitJob
  1. DAGScheduler::submitJob会创立JobSummitted的event发送给内嵌类eventProcessActor(在源码1.4中,submitJob函数中,使用DAGScheduler伊芙ntProcessLoop类进行事件的拍卖)
  2. eventProcessActor在收受到JobSubmmitted之后调用processEvent处理函数
  3. job到stage的变换,生成finalStage并付诸运行,关键是调用submitStage
  4. 在submitStage中会总结stage之间的依赖关系,看重关系分成宽依赖和窄看重二种
  5. 只要计算中发现日前的stage没有其他借助恐怕有所的正视都曾经准备甘休,则交给task
  6. 交给task是调用函数submitMissingTasks来完毕
  7. task真正运行在哪些worker上边是由TaskScheduler来保管,也等于下面的submitMissingTasks会调用TaskScheduler::submitTasks
  8. TaskSchedulerImpl中会依据斯Parker的此时此刻运作形式来创立相应的backend,如果是在单机运行则开创LocalBackend
  9. LocalBackend收到TaskSchedulerImpl传递进入的ReceiveOffers事件
  10. receiveOffers->executor.launchTask->TaskRunner.run

参考

[1] MySQL Internals Manual.
Replication.
[2] MySQL Internals Manual. The Binary
Log
.
[3] in355hz. 数据库 ACID
的实现
.
[4] jb51. MySQL 对 binlog
的处理表明
.
[5] repls. 浅析 innodb_support_xa 与
innodb_flush_log_at_trx_commit
.
[6] 68idc. MySQL 5.6 之 DBA
与开发者指南
.


下边是本人的微信公众号
「霎时之间」,除了写技术的篇章、还有产品、行业和人生的思考,希望能和越来越多走在这条路上同行者沟通。
图片 2

学业提交的调度

在斯Parker源码1.4.0中,DAGScheduler的submitJob函数不再行使DAGEventProcessActor举办事件处理和新闻通讯,而是使用DAGScheduler伊夫ntProcessLoop类实例eventProcessLoop进行JobSubmitted事件的post动作。
下边是submitJob函数代码:

  /**
   * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
   * can be used to block until the the job finishes executing or can be used to cancel the job.
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
    waiter
  }

当eventProcessLoop对象投递了JobSubmitted事件之后,对象内的eventThread线程实例对事件开展拍卖,不断从事件队列中取出事件,调用onReceive函数处监护人件,当匹配到JobSubmitted事件后,调用DAGScheduler的handleJobSubmitted函数并传播jobid、rdd等参数来拍卖Job。

图片 3

图片 4

DAGScheduler

DAGScheduler的首要任务是基于Stage创设DAG,决定每一种职责的极品地点

  • 记录哪个RDD或许Stage输出被物化
  • 面向stage的调度层,为job生成以stage组成的DAG,提交TaskSet给TaskScheduler执行
  • 再也提交shuffle输出丢失的stage

逐个Stage内,都以独立的tasks,他们手拉手实践同一个computefunction,享有平等的shuffledependencies。DAG在切分stage的时候是依据出现shuffle为界限的。

总结

大家日前先提议了一个标题,然后从数据一致性的角度去想想,参考了 MySQL
的贯彻方式。
理清并分析了 MySQL 单机环境是怎么保证复制机制的数额一致性,也等于 binlog
和工作数据的均等。
末端大家才能依据 binlog 这一个机制去完毕复制并保管主从复制的一致性。
主从复制又引入了网络因素,进一步扩大了确保核心数据一致性的复杂度,后边还会撰写进一步分析那一个题材。

handleJobSubmitted函数

Job处理进度中handleJobSubmitted相比根本,该函数根本担负RDD的倚重性分析,生成finalStage,并依照finalStage来爆发ActiveJob。
在handleJobSubmitted函数源码中,给出了有的注释:

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
      //错误处理,告诉监听器作业失败,返回....
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        // 很短、没有父stage的本地操作,比如 first() or take() 的操作本地执行
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job)
      } else {
        // collect等操作走的是这个过程,更新相关的关系映射,用监听器监听,然后提交作业
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        // 提交stage
        submitStage(finalStage)
      }
    }
    // 提交stage
    submitWaitingStages()
  }

久而久之没有写技术文章了,因为直接在揣摩
「后端分布式」这么些体系到底怎么写才适合。
近日中心想精通了,「后端分布式」包含「分布式存储」和
「分布式统计」两大类。
结合实际工作中相见的难题,以搜寻答案的措施来分析技术,很多时候我们都不是在创制新技巧,而是在运用技术。
为了更有功效与效益的用好技术,我们须要驾驭一些技能的法则与做事措施。
带着题材从使用者的角度去分析技能原理,并将开源技术产品和框架作为一类技术的参照完毕来讲课。
以讲清原理为第一目标,对于现实已毕的技术细节若无特别之处则尽量点到即止。

小结

该篇小说介绍了DAGScheduler从斯ParkerContext中开展实例化,到执行Action操作时交由任务调用runJob函数,进而介绍了提交职务的音讯调度,和处理Job函数handleJobSubmitted函数。
出于在handleJobSubmitted函数中涉嫌到依靠分析和stage的源码内容,于是小编布署在下一篇小说里进行介绍和源码分析。

转发请注明笔者詹森 Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest\_articles)
谷歌搜索jasonding1354进去自家的博客主页

DAGScheduler实例化

上边的代码是斯ParkerContext实例化DAGScheduler的历程:

  @volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => {
      try {
        stop()
      } finally {
        throw new SparkException("Error while constructing DAGScheduler", e)
      }
    }
  }

上边代码显示了DAGScheduler的构造函数定义中,通过绑定TaskScheduler的形式创设,其中次构造函数去调用主构造函数来将sc的字段填充入参:

private[spark]
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
    this(
      sc,
      taskScheduler,
      sc.listenerBus,
      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
      sc.env.blockManager.master,
      sc.env)
  }

  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

DAGScheduler的runJob函数

DAGScheduler.runjob最终把结果通过resultHandler保存重回。
此处DAGScheduler的runJob函数调用DAGScheduler的submitJob函数来交付职务:

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded => {
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      }
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
  }

发表评论

电子邮件地址不会被公开。 必填项已用*标注