【斯Parker】RDD操作详解4——Action算子

四. 更多

  1. 依照如上的架构可以搭建出来基础的指向工作的劳务,不过前边优化的长空还丰盛的伟大
    1)用户意图的通晓,对应的技能世界是:query重写,(在百度叫:DA,DA-SE输入纠错模块)
    2)as层的机械学习格局的引入,通过大气计算数据升高点击效果

  2. 通过学习和应用elasticsearch感受到的长处
    1)分布式索引的宏图:M*N,
    M可以知道为数量的垂直切分,如:VIP和SE裤,N能够领略为为了援救更快跟多的qps举行的品位切分。es协助down机重视启自动进入集群等灵活的效能
    2)接口的油滑,如是还是不是选拔分词分析,对某个复合query检索词的一些开展调权/降权

(1)collect

collect相当于toArray,toArray已经过时不引进应用,collect将分布式的RDD重回为一个单机的scala
Array数组。 在那一个数组上应用scala的函数式操作。

图片 1

图中,左边方框代表RDD分区,左侧方框代表单机内存中的数组。通过函数操作,将结果再次来到到Driver程序所在的节点,以数组方式储存。

源码:

  /**
   * Return an array that contains all of the elements in this RDD.
   */
  def collect(): Array[T] = {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

一. 应用场景

  1. 当垂直类网络服务提供的消息分明增多的时候,用户怎么着飞快获取音讯就会化为瓶颈
  2. 卓越的采取场景包含:1)商品搜索;2)房源搜索;等等
  3. 正文琢磨哪些依据开源的工具搭建一个基础的搜寻引擎,满意如下须求
    1)可以基于商品的叙述召回结果,如:搜索“澳大利亚 皮鞋”
    重回相关商品,注意:这一步仅考虑文本相关性
    2)接济普通话分词,并扶助自定义品牌词和类目词以提高查找的准确率
    3)千万量级索引

实为上在Actions算子中经过斯ParkerContext执行提交作业的runJob操作,触发了RDD
DAG的履行。

基于Action算子的输出空间将Action算子举办分拣:无输出、 HDFS、
Scala集合和数据类型。

二. 选型

  1. 开源索引系统服务 solr vs elasticsearch
    solr和elasticsearch都以基于lucene(java)的卷入,都能提供相比较通用完整的检索服务。目前elasticsearch由于对日记处理分析,更温馨的先河配置和更好的分布式话计划方案而被越来越多的人承受使用。
  1. 普通话分词
    普通话分词器相比知名的有ik和jieba。二者都自带搜狗的分词dict,并且支持自定义新词。jieba的翻新周期以及github的star数越来越多,适配elasticsearch也比较好,所以本文基于该汉语分词组件进行优化。
    jieba分词
    jieba for
    elasticsearch

    越多:常用的开源中文分词工具

(1)saveAsTextFile

函数将数据输出,存储到HDFS的指定目录。将RDD中的逐个成分映射转变为(Null,x.toString),然后再将其写入HDFS。

图片 2

图中,左边的四方代表RDD分区,左边方框代表HDFS的Block。
通过函数将RDD的各类分区存储为HDFS中的一个Block。

源码:

  /**
   * Save this RDD as a text file, using string representations of elements.
   */
  def saveAsTextFile(path: String) {
    // https://issues.apache.org/jira/browse/SPARK-2075
    //
    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
    // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
    // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
    // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
    // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
    //
    // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
    // same bytecodes for `saveAsTextFile`.
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

  /**
   * Save this RDD as a compressed text file, using string representations of elements.
   */
  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
    // https://issues.apache.org/jira/browse/SPARK-2075
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
  }

三. 架构设计

  1. 系统架构如下图所示
搜索系统架构图



其中,
  • php层首要任务
    1)向下负载均衡,失利重试,匡助无缝换库
    2)拼装结果集重回给app可能web页面
  • as层紧要职务
    1)高阶排序,依照:商品的属性,卖家的性能,历史单机突显等举行综合打分rerank
    2)依Wright征等消息囤积在redis
    3)同义词伸张辅助,rewrite query
  • bs层主要任务
    1)基于jieba定制普通话分词analyzer
    2)制定index中的mapping内容,以博得更好的搜寻结果
    3)换库脚本, 建库数据出自mysql集群(爬虫相关题材必要再度关心)
  • 查找干预mis
    1)指定query的前N个结果。
    2)指定query下不出某些结果。
  1. 四次搜索的流水线,如下图所示
检索处理流程图

(5)count

count重返整个RDD的因素个数。

图片 3

图中,重临数据的个数为5。一个四方代表一个RDD分区。

源码:

  /**
   * Return the number of elements in the RDD.
   */
  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

Scala集合和数据类型

(3)reduceByKeyLocally

落成的是先reduce再collectAsMap的功用,先对RDD的全部举办reduce操作,然后再收集所有结果重临为一个HashMap。

源码:

  /**
   * Merge the values for each key using an associative reduce function, but return the results
   * immediately to the master as a Map. This will also perform the merging locally on each mapper
   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
   */
  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {

    if (keyClass.isArray) {
      throw new SparkException("reduceByKeyLocally() does not support array keys")
    }

    val reducePartition = (iter: Iterator[(K, V)]) => {
      val map = new JHashMap[K, V]
      iter.foreach { pair =>
        val old = map.get(pair._1)
        map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
      }
      Iterator(map)
    } : Iterator[JHashMap[K, V]]

    val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
      m2.foreach { pair =>
        val old = m1.get(pair._1)
        m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
      }
      m1
    } : JHashMap[K, V]

    self.mapPartitions(reducePartition).reduce(mergeMaps)
  }

HDFS

(6)top

top可回到最大的k个成分。
看似函数表明:

  • top重临最大的k个元素。
  • take重返最小的k个成分。
  • takeOrdered重回最小的k个成分, 并且在回去的数组中保持成分的顺序。
  • first约等于top( 1) 重临整个RDD中的前k个成分,
    可以定义排序的法子Ordering[T]。再次回到的是一个含前k个成分的数组。

源码:

  /**
   * Returns the top k (largest) elements from this RDD as defined by the specified
   * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
   * {{{
   *   sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
   *   // returns Array(12)
   *
   *   sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
   *   // returns Array(6, 5)
   * }}}
   *
   * @param num k, the number of top elements to return
   * @param ord the implicit ordering for T
   * @return an array of top elements
   */
  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

(7)reduce

reduce函数也等于对RDD中的成分举行reduceLeft函数的操作。
reduceLeft先对三个要素<K,V>举办reduce函数操作,然后将结果和迭代器取出的下一个因素<k,V>进行reduce函数操作,直到迭代器遍历完所有因素,得到最终结果。
在RDD中,先对各种分区中的所有因素<K,V>的聚众分别开展reduceLeft。各个分区形成的结果一定于一个成分<K,V>,再对这一个结果集合举办reduceleft操作。

图片 4

图中,方框代表一个RDD分区,通过用户自定函数f将数据开展reduce运算。示例最终的归来结果为V1@V2U!@U2@U3@U4,12。

源码:

  /**
   * Reduces the elements of this RDD using the specified commutative and
   * associative binary operator.
   */
  def reduce(f: (T, T) => T): T = {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

(2)collectAsMap

collectAsMap对(K,V)型的RDD数据再次来到一个单机HashMap。对于重复K的RDD成分,前边的成分覆盖前边的成分。

图片 5

图中,右边方框代表RDD分区,右边方框代表单机数组。数据通过collectAsMap函数重临给Driver程序计算结果,结果以HashMap形式存储。

源码:

  /**
   * Return the key-value pairs in this RDD to the master as a Map.
   *
   * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
   *          one value per key is preserved in the map returned)
   */
  def collectAsMap(): Map[K, V] = {
    val data = self.collect()
    val map = new mutable.HashMap[K, V]
    map.sizeHint(data.length)
    data.foreach { pair => map.put(pair._1, pair._2) }
    map
  }

(2)saveAsObjectFile

saveAsObjectFile将分区中的每10个因素构成一个Array,然后将以此Array连串化,映射为(Null,BytesWritable(Y))的要素,写入HDFS为SequenceFile的格式。

图片 6

图中,右侧方框代表RDD分区,左边方框代表HDFS的Block。
通过函数将RDD的各样分区存储为HDFS上的一个Block。

源码:

  /**
   * Save this RDD as a SequenceFile of serialized objects.
   */
  def saveAsObjectFile(path: String) {
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
      .saveAsSequenceFile(path)
  }

(4)lookup

Lookup函数对(Key,Value)型的RDD操作,再次回到指定Key对应的要素形成的Seq。那个函数处理优化的一部分在于,固然这些RDD包含分区器,则只会对应处理K所在的分区,然后回来由(K,V)形成的Seq。假如RDD不包涵分区器,则需求对全RDD元素进行强力扫描处理,搜索指定K对应的因素。

图片 7

图中,左边方框代表RDD分区,左侧方框代表Seq,最终结果再次回到到Driver所在节点的选用中。

源码:

  /**
   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
   * RDD has a known partitioner by only searching the partition that the key maps to.
   */
  def lookup(key: K): Seq[V] = {
    self.partitioner match {
      case Some(p) =>
        val index = p.getPartition(key)
        val process = (it: Iterator[(K, V)]) => {
          val buf = new ArrayBuffer[V]
          for (pair <- it if pair._1 == key) {
            buf += pair._2
          }
          buf
        } : Seq[V]
        val res = self.context.runJob(self, process, Array(index), false)
        res(0)
      case None =>
        self.filter(_._1 == key).map(_._2).collect()
    }
  }

(8)fold

fold和reduce的规律相同,可是与reduce不相同,也等于各个reduce时,迭代器取的率先个要素是zeroValue。

图片 8

图中,通过用户自定义函数进行fold运算,图中的一个四方代表一个RDD分区。

源码:

  /**
   * Aggregate the elements of each partition, and then the results for all the partitions, using a
   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
   * modify t1 and return it as its result value to avoid object allocation; however, it should not
   * modify t2.
   */
  def fold(zeroValue: T)(op: (T, T) => T): T = {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

foreach

对RDD中的各种成分都选拔f函数操作,不回去RDD和Array,而是回到Uint。

图片 9

图中,foreach算子通过用户自定义函数对各个数据项举办操作。
本例中自定义函数为println,控制台打印所有数据项。

源码:

  /**
   * Applies a function f to all elements of this RDD.
   */
  def foreach(f: T => Unit) {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

(9)aggregate

aggregate先对种种分区的拥有因素举办aggregate操作,再对分区的结果开展fold操作。
aggreagate与fold和reduce的不一样之处在于,aggregate相当于接纳统一的措施举行数量聚集,那种聚集是并行化的。
而在fold和reduce函数的运算进程中,每一个分区中须要展开串行处理,各个分区串行总结完毕果,结果再按事先的法子开展联谊,并赶回最后聚集结果。

图片 10

图中,通过用户自定义函数对RDD
进行aggregate的联谊操作,图中的逐个方框代表一个RDD分区。

源码:

  /**
   * Aggregate the elements of each partition, and then the results for all the partitions, using
   * given combine functions and a neutral "zero value". This function can return a different result
   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
   * allowed to modify and return their first argument instead of creating a new U to avoid memory
   * allocation.
   */
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

转发请表明笔者杰森 Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest\_articles)
谷歌(Google)搜索jasonding1354进来小编的博客主页

无输出

发表评论

电子邮件地址不会被公开。 必填项已用*标注