澳门美高梅手机网站以elasticsearch+jieba搭建搜索服务

DAGScheduler

DAGScheduler的要任务是根据Stage构建DAG,决定每个任务的超级位置

  • 笔录哪个RDD或者Stage输出为物化
  • 面向stage的调度层,为job生成因为stage组成的DAG,提交TaskSet给TaskScheduler执行
  • 再度提交shuffle输出丢失的stage

各个一个Stage内,都是单独的tasks,他们手拉手实践及一个computefunction,享有同的shuffledependencies。DAG在切分stage的时段是据出现shuffle为限的。

一. 行使场景

  1. 当垂直类互联网服务提供的音明确增多的当儿,用户如何迅速获取信息就会见成为瓶颈
  2. 首屈一指的运用场景包括:1)商品搜索;2)房源搜索;等等
  3. 正文讨论哪些根据开源的工具搭建筑一个基础之摸索引擎,满足如下需求
    1)能够基于商品的叙述召回结果,如:搜索“欧洲 皮鞋”
    返回相关商品,注意:这同样步仅考虑文本相关性
    2)支持中文分词,并支持自定义品牌词和类目词以提升查找的准确率
    3)千万量级索引

DAGScheduler实例化

下的代码是SparkContext实例化DAGScheduler的历程:

  @volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => {
      try {
        stop()
      } finally {
        throw new SparkException("Error while constructing DAGScheduler", e)
      }
    }
  }

脚代码显示了DAGScheduler的构造函数定义着,通过绑定TaskScheduler的法子开创,其中次构造函数去调用主构造函数来用sc的字段填充入参:

private[spark]
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
    this(
      sc,
      taskScheduler,
      sc.listenerBus,
      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
      sc.env.blockManager.master,
      sc.env)
  }

  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

二. 选型

  1. 开源索引系统服务 solr vs elasticsearch
    solr和elasticsearch都是基于lucene(java)的包装,都能够提供较为通用完整的检索服务。最近elasticsearch由于针对日记处理分析,更要好的起配置与重好之分布式话部署方案要为另行多之人承受以。

  2. elasticsearch参考资料
    《master
    elasticsearch》
    《elasticsearch
    cookbook》
    《elasticsearch
    server》

  3. 华语分词
    汉语分词器比较著名的起ik和jieba。二者都由带搜狗之分词dict,并且支持自定义新词。jieba的创新周期与github的star数更多,适配elasticsearch也比好,所以本文基于该中文分词组件进行优化。
    jieba分词
    jieba for
    elasticsearch
    重多:常用的开源国语分词工具

作业提交和DAGScheduler操作

Action的大部分操作会进行作业(job)的提交,源码1.0本的job提交过程的大体调用链是:sc.runJob()–>dagScheduler.runJob–>dagScheduler.submitJob—>dagSchedulerEventProcessActor.JobSubmitted–>dagScheduler.handleJobSubmitted–>dagScheduler.submitStage–>dagScheduler.submitMissingTasks–>taskScheduler.submitTasks
切切实实的学业提交执行期的函数调用为:

  1. sc.runJob->dagScheduler.runJob->submitJob
  1. DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor(在源码1.4被,submitJob函数中,使用DAGSchedulerEventProcessLoop类进行事件的处理)
  2. eventProcessActor于接到及JobSubmmitted之后调用processEvent处理函数
  3. job到stage的换,生成finalStage并付诸运行,关键是调用submitStage
  4. 以submitStage中会计算stage之间的指关系,依赖关系分成宽依赖与小依赖两栽
  5. 设若算着发现脚下之stage没有外借助或有所的倚重还曾准备结束,则交task
  6. 交由task是调用函数submitMissingTasks来成功
  7. task真正运行于哪个worker上面是由TaskScheduler来管理,也尽管是面的submitMissingTasks会调用TaskScheduler::submitTasks
  8. TaskSchedulerImpl中会基于Spark的脚下运作模式来创造相应的backend,如果是于单机运行则创造LocalBackend
  9. LocalBackend收到TaskSchedulerImpl传递进入的ReceiveOffers事件
  10. receiveOffers->executor.launchTask->TaskRunner.run

三. 架构设计

  1. 系统架构使下图所示
搜索系统架构图



其中,
  • php层主要职责
    1)向下负载均衡,失败重试,支持无缝换库
    2)拼装结果集返回给app或者web页面
  • as层主要职责
    1)高阶排序,根据:商品之属性,卖家的性能,历史单机展示等展开汇总打分rerank
    2)依赖特征等消息囤积在redis
    3)同义词扩展支持,rewrite query
  • bs层主要职责
    1)基于jieba定制中文分词analyzer
    2)制定index中的mapping内容,以赢得重新好之检索结果
    3)换库脚本, 建库数据来mysql集群(爬虫相关题材要更关注)
  • 搜索干预mis
    1)指定query的前N个结果。
    2)指定query下非起一些结果。

  • 同样不成找的流程,如下图所示

检索处理流程图

DAGScheduler的runJob函数

DAGScheduler.runjob最后将结果经resultHandler保存返回。
此地DAGScheduler的runJob函数调用DAGScheduler的submitJob函数来交付任务:

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded => {
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      }
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
  }

四. 更多

  1. 按照如齐之架构可以搭建出来基础的对工作的劳动,但是后面优化的长空还百般底宏伟
    1)用户意图的掌握,对应的技能领域是:query重写,(在百度叫:DA,DA-SE输入纠错模块)
    2)as层的机上方式的引入,通过大量统计数据提升点击效果

  2. 经过上和动用elasticsearch感受及的亮点
    1)分布式索引的规划:M*N,
    M可以清楚吧数量的直切分,如:VIP和SE裤,N可以解为为支持更快和多之qps进行的品位切分。es支持down机器重开自动进入集群等灵活的效能
    2)接口的灵活性,如是否动分词分析,对某复合query检索词的片段进行调权/降权

作业提交的澳门美高梅手机网站调度

当Spark源码1.4.0被,DAGScheduler的submitJob函数不再采取DAGEventProcessActor进行事件处理和信通信,而是利用DAGSchedulerEventProcessLoop类实例eventProcessLoop进行JobSubmitted事件之post动作。
下面是submitJob函数代码:

  /**
   * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
   * can be used to block until the the job finishes executing or can be used to cancel the job.
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
    waiter
  }

当eventProcessLoop对象投递了JobSubmitted事件随后,对象内的eventThread线程实例对事件展开处理,不断从事件队列中取出事件,调用onReceive函数处理事件,当匹配到JobSubmitted事件后,调用DAGScheduler的handleJobSubmitted函数并传播jobid、rdd等参数来拍卖Job。

澳门美高梅手机网站 1

handleJobSubmitted函数

Job处理过程中handleJobSubmitted比较重要,该函数要担负RDD的赖分析,生成finalStage,并基于finalStage来产生ActiveJob。
以handleJobSubmitted函数源码中,给出了一部分注释:

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
      //错误处理,告诉监听器作业失败,返回....
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        // 很短、没有父stage的本地操作,比如 first() or take() 的操作本地执行
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job)
      } else {
        // collect等操作走的是这个过程,更新相关的关系映射,用监听器监听,然后提交作业
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        // 提交stage
        submitStage(finalStage)
      }
    }
    // 提交stage
    submitWaitingStages()
  }

小结

拖欠篇文章介绍了DAGScheduler从SparkContext中进行实例化,到执行Action操作时提交任务调用runJob函数,进而介绍了付出任务之音调度,和拍卖Job函数handleJobSubmitted函数。
鉴于在handleJobSubmitted函数中干到因分析与stage的源码内容,于是我计划于生一样首文章里展开介绍及源码分析。

转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest\_articles)
Google搜索jasonding1354跻身自家的博客主页

发表评论

电子邮件地址不会被公开。 必填项已用*标注