【斯Parker】RDD操作详解2——值型Transformation算子

斯Parker算子的效应

下图描述了斯Parker在运作转换中经过算子对RDD进行转移。
算子是RDD中定义的函数,可以对RDD中的数据开展转换和操作。

澳门美高梅手机网站 1

  1. 输入:在斯Parker程序运行中,数据从外表数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数额)输入斯Parker,数据进入斯Parker运行时数据空间,转化为Spark中的数据块,通过BlockManager举办管理。
  2. 运行:在Spark数据输入形成RDD后便足以透过更换算子,如filter等,对数据开展操作并将RDD转化为新的RDD,通过Action算子,触发斯Parker提交作业。
    假设数据须要复用,能够经过Cache算子,将数据缓存到内存。
  3. 输出:程序运行为止数据会输出斯Parker运行时空中,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count重回Scala
    int型数据)。

斯Parker的着力数据模型是RDD,但RDD是个抽象类,具体由各子类达成,如MappedRDD、ShuffledRDD等子类。斯Parker将常用的大数据操作都转载成为RDD的子类。

澳门美高梅手机网站 2

拍卖数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分成以下几连串型:

Transformation和Actions操作概况

1)输入分区与出口分区一对一型
2)输入分区与出口分区多对一型
3)输入分区与出口分区多对多型
4)输出分区为输入分区子集型
5)还有一种特其余输入与出口分区一对一的算子类型:Cache型。
Cache算子对RDD分区进行缓存

Transformation具体内容

  • map(func)
    :再次回到一个新的分布式数据集,由种种原成分经过func函数转换后结成
  • filter(func) :
    重回一个新的数据集,由经过func函数后重临值为true的原成分构成
    *flatMap(func) :
    类似于map,不过各个输入成分,会被映射为0到三个出口成分(由此,func函数的再次来到值是一个Seq,而不是单一元素)
  • flatMap(func) :
    类似于map,不过逐个输入成分,会被映射为0到多少个出口成分(因而,func函数的再次回到值是一个Seq,而不是单一成分)
  • sample(withReplacement, frac, seed) :
    据悉给定的私行种子seed,随机取样出多少为frac的数据
  • union(otherDataset) : 重临一个新的数据集,由原数据集和参数联合而成
  • groupByKey([numTasks]) :
    在一个由(K,V)对构成的数据集上调用,重返一个(K,Seq[V])对的数额集。注意:暗许意况下,使用8个并行职责进行分组,你可以流传numTask可选参数,依照数据量设置不相同数额的Task
  • reduceByKey(func, [numTasks]) :
    在一个(K,V)对的数额集上使用,重返一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,职责的个数是足以经过第四个可选参数来配置的。
  • join(otherDataset, [numTasks]) :
    在品种为(K,V)和(K,W)类型的多寡集上调用,重返一个(K,(V,W))对,每一个key中的所有因素都在一块的数据集
  • groupWith(otherDataset, [numTasks]) :
    在类型为(K,V)和(K,W)类型的多少集上调用,再次来到一个数据集,组成要素为(K,
    Seq[V], Seq[W]) Tuples。那些操作在其他框架,称为CoGroup
  • cartesian(otherDataset) :
    笛卡尔积。但在数码集T和U上调用时,再次回到一个(T,U)对的数据集,所有因素交互进行笛Carl积。
  • flatMap(func) :
    类似于map,可是各种输入成分,会被映射为0到多少个出口成分(因而,func函数的再次来到值是一个Seq,而不是单一成分)

输入分区与出口分区一对一型

Actions具体内容

  • reduce(func) :
    通过函数func聚集数据集中的保有因素。Func函数接受2个参数,重临一个值。这么些函数必须是关联性的,确保能够被科学的出现执行
  • collect() :
    在Driver的程序中,以数组的样式,重返数据集的有着因素。那经常会在动用filter或许此外操作后,重回一个十足小的数额子集再选用,直接将所有RDD集Collect重返,很或然会让Driver程序OOM
  • count() : 再次回到数据集的要素个数
  • take(n) :
    重返一个数组,由数据集的前n个成分构成。注意,这么些操作近来不用在三个节点上,并行执行,而是Driver程序所在机器,单机总计有所的成分(Gateway的内存压力会附加,须求谨慎使用)
  • first() : 重返数据集的率先个要素(类似于take(1))
  • saveAsTextFile(path) :
    将数据集的成分,以textfile的款型,保存到地头文件系统,hdfs或然其余其余hadoop协理的文件系统。斯Parker将会调用各种元素的toString方法,并将它转换为文件中的一行文本
  • saveAsSequenceFile(path) :
    将数据集的成分,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或许其余其余hadoop帮助的文件系统。RDD的成分必须由key-value对组合,并都落到实处了Hadoop的Writable接口,或隐式可以变换为Writable(斯Parker包含了基本类型的转移,例如Int,Double,String等等)
  • foreach(func) :
    在数据集的每个要素上,运行函数func。那平时用于创新一个累加器变量,大概和表面存储系统做交互

(1)map

将原本RDD的各种数据项通过map中的用户自定义函数f映射转变为一个新的因素。源码中的map算子约等于初始化一个RDD,新RDD叫作MapPartitionsRDD(this,sc.clean(f))。

澳门美高梅手机网站 3

图中,每种方框表示一个RDD分区,左边的分区经过用户自定义函数f:T->U映射为右边的新的RDD分区。不过其实唯有等到Action算子触发后,这几个f函数才会和其他函数在一个Stage中对数码举办演算。
V1输入f转换输出V’ 1。

源码:

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

澳门美高梅手机网站,算子分类

大概可以分为三大类算子:

  1. Value数据类型的Transformation算子,那种转移并不接触提交作业,针对处理的数码项是Value型的数码。
  2. Key-Value数据类型的Transfromation算子,那种转移并不接触提交作业,针对处理的数量项是Key-Value型的数量对。
  3. Action算子,那类算子会触发斯ParkerContext提交Job作业。

转载请评释作者杰森 Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest\_articles)
谷歌搜索jasonding1354进来自家的博客主页

(2)flatMap

将本来RDD中的各个成分通过函数f转换为新的成分,并将转移的RDD的各样集合中的元素合并为一个聚众。
内部开创FlatMappedRDD(this,sc.clean(f))。

澳门美高梅手机网站 4

图中,小方框表示RDD的一个分区,对分区进行flatMap函数操作,flatMap中传入的函数为f:T->U,T和U可以是随机的数据类型。将分区中的数据通过用户自定义函数f转换为新的数目。外部大方框能够认为是一个RDD分区,小方框代表一个见面。
V1、 V2、 V3在一个见面作为RDD的一个数额项,转换为V’ 1、 V’ 2、 V’
3后,将整合拆散,形成为RDD中的数据项。

源码:

  /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

(3)mapPartitions

mapPartitions函数获取到各种分区的迭代器,在函数中经过那几个分区全部的迭代器对任何分区的因素举行操作。
内部贯彻是生成MapPartitionsRDD。

澳门美高梅手机网站 5

图中,用户通过函数f(iter) =>
iter.filter(_>=3)对分区中的所有数据开展过滤,>=3的数目保存。一个四方代表一个RDD分区,含有1、
2、 3的分区过滤只剩余成分3。

源码:

  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
    val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
    new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
  }S

(4)glom

glom函数将种种分区形成一个数组,内部贯彻是回来的RDD[Array[T]]。

澳门美高梅手机网站 6

图中,方框代表一个分区。 该图表示含有V1、 V2、
V3的分区通过函数glom形成一个数组Array[(V1),(V2),(V3)]。

源码:

  /**S
   * Return an RDD created by coalescing all elements within each partition into an array.
   */
  def glom(): RDD[Array[T]] = {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
  }

输入分区与出口分区多对一型

(1)union

运用union函数时索要确保七个RDD成分的数据类型相同,再次回到的RDD数据类型和被统一的RDD成分数据类型相同,并不开展去重操作,保存所有因素。如若想去重,可以使用distinct()。++符号也就是uion函数操作。

澳门美高梅手机网站 7

图中,左边的大方框代表三个RDD,大方框内的小方框代表RDD的分区。右边大方框代表联合后的RDD,大方框内的小方框代表分区。含有V1,V2…U4的RDD和带有V1,V8…U8的RDD合并所有因素形成一个RDD。V1、V1、V2、V8形成一个分区,其余因素同理进行合并。

源码:

  /**
   * Return the union of this RDD and another one. Any identical elements will appear multiple
   * times (use `.distinct()` to eliminate them).
   */
  def union(other: RDD[T]): RDD[T] = {
    if (partitioner.isDefined && other.partitioner == partitioner) {
      new PartitionerAwareUnionRDD(sc, Array(this, other))
    } else {
      new UnionRDD(sc, Array(this, other))
    }
  }

(2)certesian

对五个RDD内的享有因素举行笛Carl积操作。操作后,内部贯彻重临CartesianRDD。

澳门美高梅手机网站 8

左边的大方框代表多少个RDD,大方框内的小方框代表RDD的分区。右边大方框代表统一后的RDD,大方框内的小方框代表分区。
大方框代表RDD,大方框中的小方框代表RDD分区。 例如,V1和另一个RDD中的W1、
W2、 Q5进展笛Carl积运算形成(V1,W1)、(V1,W2)、(V1,Q5)。

源码:

  /**
   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
   * elements (a, b) where a is in `this` and b is in `other`.
   */
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)

输入分区与出口分区多对多型

groupBy

将成分通过函数生成对应的Key,数据就转账为Key-Value格式,之后将Key相同的要素分为一组。

val cleanF = sc.clean(f)中sc.clean函数将用户函数预处理;
this.map(t => (cleanF(t), t)).groupByKey(p)对数码map进行函数操作,再对groupByKey举办分组操作。其中,p中确定了分区个数和分区函数,也就控制了并行化的水准。

澳门美高梅手机网站 9

图中,方框代表一个RDD分区,相同key的成分合并到一个组。
例如,V1,V2合并为一个Key-Value对,其中key为“ V” ,Value为“ V1,V2”
,形成V,Seq(V1,V2)。

源码:

  /**
   * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
   * mapping to that key. The ordering of elements within each group is not guaranteed, and
   * may even differ each time the resulting RDD is evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   */
  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
    groupBy[K](f, defaultPartitioner(this))

  /**
   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
   * mapping to that key. The ordering of elements within each group is not guaranteed, and
   * may even differ each time the resulting RDD is evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   */
  def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
    groupBy(f, new HashPartitioner(numPartitions))

  /**
   * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
   * mapping to that key. The ordering of elements within each group is not guaranteed, and
   * may even differ each time the resulting RDD is evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   */
  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
      : RDD[(K, Iterable[T])] = {
    val cleanF = sc.clean(f)
    this.map(t => (cleanF(t), t)).groupByKey(p)
  }

输出分区为输入分区子集型

(1)filter

filter的职能是对成分举行过滤,对逐个成分运用f函数,重回值为true的要素在RDD中保存,重返为false的将过滤掉。
内部落到实处相当于生成FilteredRDD(this,sc.clean(f))。

澳门美高梅手机网站 10

图中,各种方框代表一个RDD分区。
T可以是自由的品类。通过用户自定义的过滤函数f,对种种数据项举办操作,将满意条件,再次来到结果为true的多少项保留。
例如,过滤掉V2、 V3保存了V1,将不一样命名为V1’。

源码:

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

(2)distinct

distinct将RDD中的成分举行去重操作。

澳门美高梅手机网站 11

图中,逐个方框代表一个分区,通过distinct函数,将数据去重。
例如,重复数据V1、 V1去重后只保留一份V1。

源码:

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)

(3)subtract

subtract相当于举办联谊的差操作,RDD 1去除RDD 1和RDD 2交集中的装有因素。

澳门美高梅手机网站 12

图中,右边的大方框代表七个RDD,大方框内的小方框代表RDD的分区。右边大方框代表统一后的RDD,大方框内的小方框代表分区。V1在七个RDD中均有,依据差集运算规则,新RDD不保留,V2在首先个RDD有,第三个RDD没有,则在新RDD成分中包蕴V2。

源码:

  /**
   * Return an RDD with the elements from `this` that are not in `other`.
   *
   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
   * RDD will be <= us.
   */
  def subtract(other: RDD[T]): RDD[T] =
    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))

  /**
   * Return an RDD with the elements from `this` that are not in `other`.
   */
  def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
    subtract(other, new HashPartitioner(numPartitions))

  /**
   * Return an RDD with the elements from `this` that are not in `other`.
   */
  def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {
    if (partitioner == Some(p)) {
      // Our partitioner knows how to handle T (which, since we have a partitioner, is
      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
      val p2 = new Partitioner() {
        override def numPartitions = p.numPartitions
        override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
      }
      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
      // anyway, and when calling .keys, will not have a partitioner set, even though
      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
      // partitioned by the right/real keys (e.g. p).
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
    } else {
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
    }
  }

(4)sample

sample将RDD这一个集合内的要素举行采样,获取具有因素的子集。用户能够设定是或不是有放回的取样、百分比、随机种子,进而决定采样格局。
参数表达:

withReplacement=true, 表示有放回的取样;
withReplacement=false, 表示无放回的取样。

澳门美高梅手机网站 13

各种方框是一个RDD分区。通过sample函数,采样50%的多寡。V1、V2、U1、U2、U3、U4采样出多少V1和U1、U2,形成新的RDD。

源码:

  /**
   * Return a sampled subset of this RDD.
   */
  def sample(withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] = {
    require(fraction >= 0.0, "Negative fraction value: " + fraction)
    if (withReplacement) {
      new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
    } else {
      new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
    }
  }

(5)takeSample

take萨姆ple()函数和上边的sample函数是一个法则,但是不行使相相比较例采样,而是按设定的采样个数进行采样,同时再次来到结果不再是RDD,而是一定于对采样后的数目开展collect(),再次来到结果的聚合为单机的数组。

澳门美高梅手机网站 14

图中,左边的四方代表分布式的相继节点上的分区,左边方框代表单机上回来的结果数组。通过takeSample对数据采样,设置为采样一份数据,重临结果为V1。

源码:

  /**
   * Return a fixed-size sampled subset of this RDD in an array
   *
   * @param withReplacement whether sampling is done with replacement
   * @param num size of the returned sample
   * @param seed seed for the random number generator
   * @return sample of specified size in an array
   */
  def takeSample(withReplacement: Boolean,
      num: Int,
      seed: Long = Utils.random.nextLong): Array[T] = {
    val numStDev =  10.0

    if (num < 0) {
      throw new IllegalArgumentException("Negative number of elements requested")
    } else if (num == 0) {
      return new Array[T](0)
    }

    val initialCount = this.count()
    if (initialCount == 0) {
      return new Array[T](0)
    }

    val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
    if (num > maxSampleSize) {
      throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
        s"$numStDev * math.sqrt(Int.MaxValue)")
    }

    val rand = new Random(seed)
    if (!withReplacement && num >= initialCount) {
      return Utils.randomizeInPlace(this.collect(), rand)
    }

    val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
      withReplacement)

    var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

    // If the first sample didn't turn out large enough, keep trying to take samples;
    // this shouldn't happen often because we use a big multiplier for the initial size
    var numIters = 0
    while (samples.length < num) {
      logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
      samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
      numIters += 1
    }

    Utils.randomizeInPlace(samples, rand).take(num)
  }

Cache型

(1)cache

cache将RDD元素从磁盘缓存到内存,也就是persist(MEMORY_ONLY)函数的功力。

澳门美高梅手机网站 15

图中,每种方框代表一个RDD分区,右边也就是数据分区都存储在磁盘,通过cache算子将数据缓存在内存。

源码:

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def cache(): this.type = persist()

(2)persist

persist函数对RDD进行缓存操作。数据缓存在何地由StorageLevel枚举类型确定。
有二种档次的结缘,DISK代表磁盘,MEMORY代表内存,SER代表数据是还是不是开展体系化存储。StorageLevel是枚举类型,代表存储情势,如,MEMORY_AND_DISK_SER代表数据可以储存在内存和磁盘,并且以系列化的法子存储。
其余同理。

澳门美高梅手机网站 16

澳门美高梅手机网站 17

图中,方框代表RDD分区。 disk代表存储在磁盘,mem代表存储在内存。
数据最初全体储存在磁盘,通过persist(MEMORY_AND_DISK)将数据缓存到内存,但是有些分区不只怕兼容在内存,例如:图3-18团雅士利含V1,V2,V3的RDD存储到磁盘,将含有U1,U2的RDD依旧存储在内存。

源码:

  /**
   * Set this RDD's storage level to persist its values across operations after the first time
   * it is computed. This can only be used to assign a new storage level if the RDD does not
   * have a storage level set yet..
   */
  def persist(newLevel: StorageLevel): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this)
    // Register the RDD with the ContextCleaner for automatic GC-based cleanup
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    storageLevel = newLevel
    this
  }

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

转发请注脚我杰森 Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest\_articles)
谷歌搜索jasonding1354进来本人的博客主页

发表评论

电子邮件地址不会被公开。 必填项已用*标注