博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark
阅读量:4207 次
发布时间:2019-05-26

本文共 12182 字,大约阅读时间需要 40 分钟。

Spark

  Spark实现了Standalone作为其内置的资源管理和调度框架,因此不依赖第三方的资源管理和调度器。该模式下由于Master节点存在单点故障,要解决此问题需要借助Zookeeper并且启动至少两个Master节点来实现高可靠。

Spark与Hadoop

Spark比MapReduce快的原因

  1. Spark是基于内存进行数据处理的,MapReduce是基于磁盘进行数据处理的
      MapReduce的中间结果落到磁盘,涉及多次磁盘IO。而Spark读取数据后,在内存中存储和运算,避免了大量的IO
  2. Spark中具有DAG有向无环图
      Spark 计算比 MapReduce 快的根本原因在于 DAG 计算模型。DAG 相比MapReduce 在大多数情况下可以减少 shuffle 次数。如果计算不涉及与其他节点进行数据交换,Spark 可以在内存中一次性完成这些操作,即中间结果无须落盘,减少了磁盘 IO 的操作
  3. RDD
      Spark 是通过RDD算子来运算的,它拥有两种操作,transformation和action,支持懒加载。RDD还拥有容错机制血缘关系Linage,当数据丢失时可以恢复数据。Spark速度非常快的原因之一是Spark 支持将需要反复用到的数据给 Cache 到内存中,减少数据加载耗时,当持久化某个RDD后,可以在其他计算中重用
  4. 资源申请粒度不同
      Spark是粗粒度资源申请,即当提交Application时,Application会将所有的资源申请完毕,如果申请不到资源就等待,如果申请到资源才执行Application,task在执行的时候就不需要自己去申请资源,task执行快,当最后一个task执行完之后task才会被释放
      MapReduce是细粒度资源申请,当提交Application的时候,task执行时,自己申请资源,自己释放资源,task执行完毕之后,资源立即会被释放,task执行的慢,Application执行的相对比较慢

懒加载有什么好处

  RDD使用延迟加载,即懒加载,只有当执行操作,例如写入存储、collect 等操作时才加载数据。好处:
  ① 提高可管理性。可以查看整个 DAG,并且可以使用该信息进行流水线优化。比如:两个连续的map操作,在源码里面是在记录级别连续执行的,而不是做完一个RDD再去map到下一个RDD(流水线优化)
  ② 降低时间复杂度和加快计算速度。只运算真正要计算的转换操作,并且可以根据 DAG 图,合并不需要与 drive 通信的操作(连续的依赖转换),例如在一个 RDD 上同时调用 map 和 filter 转换操作,Spark可以将 map 和 filter 指令发送到每个 executor 上,Spark程序在真正执行 map 和 filter 时,只需访问一次 record,而不是发送两组指令并两次访问分区。理论上相对于非惰性,将时间复杂度降低了一半。例如:

val list1 = list.map(i -> i * 3)  // Transformation1val list2 = list1.map(i -> i + 3) // Transformation1val list3 = list1.map(i -> i / 3) // Transformation1list3.collect()               // ACTION

假设原始列表(list) 很大,其中包含数百万个元素。如果没有懒惰的评估,我们将完成三遍如此庞大的计算。如果我们假设一次这样的列表迭代需要 10 秒,那么整个评估就需要 30 秒。并且每个 RDD 都会缓存下来,浪费内存。使用惰性评估,Spark 可以将这三个转换像这样合并到一个转换中,如下:

val list3 = list.map(i -> i + 1)

它将只执行一次该操作。只需一次迭代即可完成,这意味着只需要 10 秒的时间。

Spark与Hadoop区别

  Hadoop和Spark都是并行计算,两者都是用MapReduce模型进行计算。Hadoop一个作业称为一个Job,Job里面分为Map Task和Reduce Task阶段,每个Task都在自己的进程中运行,当Task结束时,进程也会随之结束。

  Spark用户提交的任务称为application,一个application对应一个SparkContext,application中存在多个job,每触发一次action操作就会产生一个job。每个job中有多个stage,stage是shuffle过程中DAGScheduler通过RDD之间的依赖关系划分的,每个stage里面有多个task,组成taskset,由TaskScheduler分发到各个executor中执行。一个Application -> 多个job ->一个job多个stage -> 一个stage多个task

  1. Spark计算速度比Hadoop快。Hadoop是从HDFS读取数据,通过MR将中间结果写入HDFS;然后再重新从HDFS读取数据进行MR,再刷写到HDFS,这个过程涉及多次落盘操作,多次磁盘IO,效率并不高;而Spark是读取集群中的数据后,在内存中存储和运算,避免了大量的IO
  2. Spark中RDD一般存放在内存中,如果内存不够存放数据,会同时使用磁盘存储数据。Spark通过在内存中缓存处理的数据,提高了处理流式数据和迭代式数据的性能。通过RDD之间的血缘连接、数据存入内存中切断血缘关系等机制,可以实现灾难恢复,当数据丢失时可以恢复数据
  3. JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 时启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多
  4. Spark用scala语言编写,相比java语言编写的Hadoop程序更加简洁
  5. 相比Hadoop中对于数据计算只提供了Map和Reduce两个操作,Spark提供了丰富的算子,可以通过transformation和action算子,实现很多复杂算法操作

Spark RDD

  RDD(Resilient Distributed Dataset)分布式数据集,代表一个不可变、可分区、里面的元素可并行计算的集合。每个RDD的数据都以Block的形式存储于多台机器上,其中每个Executor会启动一个BlockManagerSlave管理一部分Block,而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系。

在这里插入图片描述
RDD允许用户在执行多个查询时显示地将工作集缓存在内存中,后续的查询能够重用工作集,极大地提升查询速度。

*Internally,each RDD is characterized by five main properties:**A list of partitions *A function for computing each split*A list of dependencies on other RDDs*Optionally,a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)*Oprionally,a list of preferred locations to compute each split on(e.g. block location for an HDFS file)
  • 一组Partition。对于RDD来说,每个partition都会被一个executor处理。用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值,默认值是程序所分配到的CPU Core的数目。
  • 一个计算每个分区的函数。Spark中RDD的计算是以partition为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会以迭代器进行复合,不需要保存每次计算的结果。
  • RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
  • 一个Partitioner,即RDD的分区函数。当前Spark中实现了两种类型的分区函数:HashPartitioner和RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。
  • 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能将任务分配到其所要处理数据块的存储位置。

  创建RDD有两种方式。【1】通过HDFS支持的文件系统创建RDD,RDD里面没有真正要计算的数据,只记录了下一个元数据,比如:是哪里的数据、调用什么函数,一旦触发一个Action就会提交一个任务【2】通过Scala集合或数组以并行化的方式创建RDD

sc.textFile("hdfs://root:9000/file")val rdd1=sc.parallelize(Array(1,2,3,4,5,6))//通过并行比如化Scala集合创建RDD

RDD的弹性体现在哪里?

1.自动进行内存和磁盘切换

  Spark 可以使用 persist 和 cache 方法将任意 RDD 缓存到内存或者磁盘文件系统中。数据会优先存储到内存中,当内存不足以存放RDD中的数据的时候,就会持久化到磁盘上

2.基于lineage的高效容错

3.task如果失败会特定次数的重试

4.stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片

  Stage对象可以记录并跟踪多个StageInfo,缺省的重试次数也是4次。且可以直接运行计算失败的阶段,值计算失败的数据分片。 Stage是Spark Job运行时均有相同逻辑功能和并行计算任务的一个基本单元。其中的所有任务都依赖同样的Shuffle,每个DAG任务都是通过DAGScheduler在Stage的边界处发生Shuffle形成Stage,然后DAGScheduler按照这些拓扑结构执行

5.checkpoint和persist

6.数据调度弹性

  DAGScheduler和TaskScheduler和资源无关。Spark将执行模型抽象为DAG,可以让多个Stage任务串联或者并行执行,而无需将中间结果写入到HDFS中。这样当某个节点故障的时候,可以由其他节点来执行出错的任务

7.数据分片的高度弹性repartion

  Spark进行数据分片的时候,默认将数据存放在内存中,当内存不够的时候,会将一部分存放到磁盘上。如经过分片后,某个Partition非常小,就可以合并多个小的partition来计算,而不用每个partition都起一个线程。这样就提高了效率,也不会因为大量的线程而导致OOM

RDD为何高效?

  1. RDD是不可变的+lazy加载
  2. RDD是粗粒度。对于RDD的写是粗粒度的,每次操作都作用于所以集合;RDD的读操作可以是粗粒度的也可以是细粒度的: 可以读其中的一条记录

Transformation/Action RDD算子

  在Spark里面对RDD操作分为两类,一类叫作Transformation(转换),转换具有延迟执行(软加载)的特点。Transformation会记录元数据信息,当计算任务触发Action时才会真正开始计算。为什么要进行软加载?因为存在很多这种算子,有一个算子就触发计算,需要计算很多次。另一种叫作Action(动作)。

常用的Transformation:

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T]=>Iterator{…}
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(int,Iterator[T])=>iterator{…}
sample(withReplacement,Fraction,seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(other Dataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(other Dataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks]) 对源RDD进行去重后后返回一个新的RDD
groupByKey([numTasks]) 在一个(k,v)的RDD上调用,返回一个(k,Iterator[V])的RDD
reduceByKey(func [numTasks]) 在一个(k,v)的RDD上调用,返回一个(k,v)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
sortByKey([ascending],[numTasks]) 在一个(k,v)的RDD上调用,k必须实现Ordered接口,返回一个按照key进行排序的(k,v)的RDD
sortBy(func,[ascending],[numTasks]) 与sortByKey类似
join(otherDataset,[numTasks]) 在类型为(k,v)和(k,w)的RDD上调用,返回一个相同key对应的所有元素在一起的(k,fv,w)的RDD
cogroup(otherDataset,[numTasks]) 在类型为(k,v)和(k,w)的RDD上调用,返回一个(k,iterable,iterable)类型的RDD
aggregateByKey(zeroValue)(seqOp)([numTasks])
coalesce(numPartitions)
repartition(numPartitions)

aggregate(聚合):现对每个partition进行聚合,再对个部分的结果的进行聚合。定义的数组内容在Master上,完成计算是在Worker上。RDD会在Master机器上启动一个Driver进程,专门用来提交程序。

aggregateByKey:对相同key的value进行操作
combineByKey:传进去3个函数,第一个函数相对于key进行分组,分组之后对分组里面的第一个元素进行操作,第二个函数是局部进行操作,第三个是对所有partition进行操作。reduceByKey底层是由combineByKey实现的。
repartition:重新分区,假如数据原先在两个分区,运行这个命令重新分区为3个,数据要通过网络传输到另外的机器上。
Action:

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num,[seed]) 返回一个数组,该数组由从数据集中随机采样的num各元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n,[ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本

RDD的依赖关系

RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。窄依赖指每一个父RDD的Partition最多被子RDD的一个Partition使用,宽依赖指多个子RDD的Partition会依赖同一个父RDD的Partition。
在这里插入图片描述
Lineage
  RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

RDD的缓存以及缓存方式

  Spark速度非常快的原因之一是在不同操作中可以在内存中持久化或缓存数据集,当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中并在其他动作中重用,这使得后续的动作变得更加迅速。
  RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中并供后面重用,其中cache调用persist方法。若数据大于可用内存,例如机器只有512M内存,HDFS中数据有1G,调用cache方法,会缓存512M数据,剩余数据还是在HDFS中。

/**Persist this RDD with the default storage level('MEMORY_ONLY')def cache():this.type=persist()/**Persist this RDD with the default storage level('MEMORY_ONLY')def persist():this.type=persist(StorageLevel.MEMORY_ONLY)

RDD缓存可以对应不同的存储级别。

在这里插入图片描述

RDD转换

针对于下过程分析该计算中会产生多少RDD以及RDD之间的依赖关系。

val rdd=sc.textFile("hdfs://root:9000/app")			.flatMap(_.split(" "))			.map((_,1))			.reduceByKey(_+_)			.saveAsTextFile(args(1))//产生6个RDD

  Spark往HDFS中写数据,需要拿到HDFS一个流,然后往HDFS中写数据,如果是一条数据写一次,就会频繁打开、关闭一个流,效率会很低。可以拿到针对一个分区拿到一个流,把一个分区中的数据写到HDFS中。

   【.textFile(“hdfs://root:9000/app”)】 会产生HadoopRDD、MapPartitionsRDD,因为通过Hadoop从HDFS中读数据,数据为K-V类型,Key是LongWritable,Value是caches,因为它用的InputFormat。之后发现偏移量没有用,把每一条数据进行处理(只保留Value),这时候需要经过一次map(RDD的Map),生成MapPartitionsRDD,把每个分区的数据拿出来,调用Scala的map方法 。【.flatMap(.split(" ")) 】和【.map((,1))】产生一个RDD:MapPartitionsRDD。【.reduceByKey(+)】将reduceByKey转换成pariRDDFunction,reduceByKey调用combineByKeyWithClassTag,这个函数要求传入三个参数【1】把每个分区的key拿出来处理【2】局部的分区进行聚合操作【3】全部的数据拿出来聚合操作。reduceByKey会产生一个ShuffledRDD,会产生一个局部聚和。【.saveAsTextFile(args(1))】saveAsTextFile往HDFS中写数据,调用MapPartitionsRDD方法,把一个分区的数据拿出来然后建立一个流写数据。

在这里插入图片描述

CheckPoint

  RDD中有很多分区,这些分区由多个Executor处理,每个Executor只处理一个或两个分区,一个分区肯定在一个Executor上。尽管可以通过cache或者persist将数据放到内存或者磁盘中,但是内存和磁盘都有可能坏掉,当进行下一步计算找不到数据了。由于RDD有容错机制,RDD和RDD会记录之间的依赖关系,即使以前的数据丢失了一部分,可以通过血统恢复,但是数据要经过几百次迭代,代价很大。这里可以把中间结果做一个checkpoint,把中间的结果保存到HDFS。

/***Mark this RDD for checkpointing.It will be saved to a file inside the checkpoint *directory set with 'SparkContextsetCheckpointDir' and all references to its parent*RDDs will be removed.This function must be called before any job has been*executed on this RDD.It is strongly recommended that this RDD is persisted in*memory,otherwise saving it on a file will require recomputation.**/

在CheckPoint之前一定要指定一个存储目录:sc.setcheckpointDir("…")

在这里插入图片描述
并且rdd.checkpoint是一个Transformation,在触发Action时会执行两个任务【1】计算Action任务【2】把数据保存到HDFS。在使用CheckPoint之前最好把数据缓存到内存中,速度会快很多,不用重新启动一个job,而是直接从内存当中把数据写入到HDFS中。之后的RDD在计算Transformation或Action时直接从CheckPoint中读取数据,不再依赖血统关系。

checkpoint写流程

RDD中的数据是什么时候写入的?是在RDD调用checkpoint方法时候吗?

  RDD中checkpoint方法,在该方法中是只是新建了一个ReliableRDDCheckpointData的对象,并没有做实际的写入工作。实际触发写入的时机是在runJob生成该RDD后,调用RDD的doCheckpoint方法来做的。

在做checkpoint的时候,具体写入了哪些数据到HDFS了?

  在经历调用RDD.doCheckpoint → RDDCheckpointData.checkpoint →ReliableRDDCheckpointData.doCheckpoint →ReliableRDDCheckpointData.writeRDDToCheckpointDirectory后,在writeRDDToCheckpointDirectory方法中可以看到:将作为一个单独的任务(RunJob)将RDD中每个parition的数据依次写入到checkpoint目录(writePartitionToCheckpointFile),此外如果该RDD中的partitioner如果不为空,则也会将该对象序列化后存储到checkpoint目录。所以,在做checkpoint的时候,写入的hdfs中的数据主要包括:RDD中每个parition的实际数据,以及可能的partitioner对象(writePartitionerToCheckpointDir)。

在对RDD做完checkpoint以后,对这个RDD的本身又做了哪些收尾工作?

  在写完checkpoint数据到hdfs以后,将会调用rdd的markCheckpoined方法,主要斩断该rdd的对上游的依赖,以及将paritions置空等操作。

实际过程中,使用RDD做checkpoint的时候需要注意什么问题?

  在RDD计算完毕后,会再次通过RunJob将每个partition数据保存到HDFS。这样RDD将会计算两次,所以为了避免此类情况,最好将RDD进行cache。

sc.setCheckpointDir(checkpointDir.toString)val rdd = sc.makeRDD(1 to 20, numSlices = 1)rdd.cache()rdd.checkpoint()

Spark中的cache/persist/checkpoint

cache与persist

  cache 能够让重复数据在同一个 application 中的 jobs 间共享。RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY。
  cache与persist的唯一区别在于: cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据StorageLevel设置其它的缓存级别。cache或者persist并不是action。

系统如何对RDD进行cache?

  将要计算 RDD partition 的时候就去判断 partition 要不要被 cache。如果要被 cache 的话,先将 partition 计算出来,然后 cache 到内存。调用 rdd.cache() 后, RDD就变成 persistRDD ,其StorageLevel为MEMORY_ONLY。

cached RDD 怎么被读取?

  当要计算某个RDD中的partition时候,会先去 blockManager里面查找是否已经被cache了。如果partition被cache在本地,就直接使用blockManager.getLocal()去本地 memoryStore里读取。如果该 partition 被其他节点上 blockManager cache 了,会通过blockManager.getRemote() 去其他节点上读取。

cache与checkpoint

  即cache 和 checkpoint 的显著区别是:cache把 RDD 计算出来然后放在内存中, 但是RDD的依赖链也不能丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过依赖链重新计算出来;而 checkpoint 是把 RDD 保存在 HDFS中,会斩断血缘关系。
  cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了,checkpoint是等到 job 结束后启动专门的 job 去完成 checkpoint ,即checkpoint 的 RDD 会被计算两次。因此,在使用 rdd.checkpoint() 的时候,建议加上 rdd.cache(),这样第二次运行的 job 就不用再去计算该 RDD了,直接读取 cache 写磁盘。

persist与checkpoint

  rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 区别的是:前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,即executor所在进程stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS,如果不被手动 remove 掉,是一直存在的,可以被下一个 driver program 使用。

转载地址:http://dyhli.baihongyu.com/

你可能感兴趣的文章
display:table-cell自适应布局下连续单词字符换行
查看>>
css行高line-height的一些深入理解及应用
查看>>
我对CSS vertical-align的一些理解与认识(一)
查看>>
我对CSS vertical-align的一些理解与认识(二)
查看>>
CSS中height:100%和height:inherit的异同
查看>>
absolute元素在text-align属性下的对齐显示
查看>>
技术管理规划-设定团队的职能
查看>>
技术管理规划-如何设定团队的目标
查看>>
技术管理规划-如何规划团队的架构
查看>>
管理任务执行-如何排任务优先级
查看>>
0115 springboot template方式操作mongodb
查看>>
0116 spring的webFlux
查看>>
0121 spring-boot-redis的使用
查看>>
面试刷题31:分布式ID设计方案
查看>>
根据身份证号码来提取人员的信息【身份证号码的前六位所代表的省,市,区, 以及地区编码】的网上地址
查看>>
php安装工具 网址
查看>>
php NetBeans IDE Build 201208070001 打开一个现有的php 网站
查看>>
win7系统中, Microsoft Office Word已停止工作
查看>>
.net中 网页抓取数据(提取html中的数据,提取table中的数据)
查看>>
c# Windows Forms Application中的DataGridView的数据指定列绑定 简单小例子
查看>>