本文共 12182 字,大约阅读时间需要 40 分钟。
Spark实现了Standalone作为其内置的资源管理和调度框架,因此不依赖第三方的资源管理和调度器。该模式下由于Master节点存在单点故障,要解决此问题需要借助Zookeeper并且启动至少两个Master节点来实现高可靠。
懒加载有什么好处
val list1 = list.map(i -> i * 3) // Transformation1val list2 = list1.map(i -> i + 3) // Transformation1val list3 = list1.map(i -> i / 3) // Transformation1list3.collect() // ACTION
假设原始列表(list) 很大,其中包含数百万个元素。如果没有懒惰的评估,我们将完成三遍如此庞大的计算。如果我们假设一次这样的列表迭代需要 10 秒,那么整个评估就需要 30 秒。并且每个 RDD 都会缓存下来,浪费内存。使用惰性评估,Spark 可以将这三个转换像这样合并到一个转换中,如下:
val list3 = list.map(i -> i + 1)
它将只执行一次该操作。只需一次迭代即可完成,这意味着只需要 10 秒的时间。
Hadoop和Spark都是并行计算,两者都是用MapReduce模型进行计算。Hadoop一个作业称为一个Job,Job里面分为Map Task和Reduce Task阶段,每个Task都在自己的进程中运行,当Task结束时,进程也会随之结束。
Spark用户提交的任务称为application,一个application对应一个SparkContext,application中存在多个job,每触发一次action操作就会产生一个job。每个job中有多个stage,stage是shuffle过程中DAGScheduler通过RDD之间的依赖关系划分的,每个stage里面有多个task,组成taskset,由TaskScheduler分发到各个executor中执行。一个Application -> 多个job ->一个job多个stage -> 一个stage多个taskRDD(Resilient Distributed Dataset)分布式数据集,代表一个不可变、可分区、里面的元素可并行计算的集合。每个RDD的数据都以Block的形式存储于多台机器上,其中每个Executor会启动一个BlockManagerSlave管理一部分Block,而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系。
RDD允许用户在执行多个查询时显示地将工作集缓存在内存中,后续的查询能够重用工作集,极大地提升查询速度。*Internally,each RDD is characterized by five main properties:**A list of partitions *A function for computing each split*A list of dependencies on other RDDs*Optionally,a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)*Oprionally,a list of preferred locations to compute each split on(e.g. block location for an HDFS file)
创建RDD有两种方式。【1】通过HDFS支持的文件系统创建RDD,RDD里面没有真正要计算的数据,只记录了下一个元数据,比如:是哪里的数据、调用什么函数,一旦触发一个Action就会提交一个任务【2】通过Scala集合或数组以并行化的方式创建RDD
sc.textFile("hdfs://root:9000/file")val rdd1=sc.parallelize(Array(1,2,3,4,5,6))//通过并行比如化Scala集合创建RDD
1.自动进行内存和磁盘切换
Spark 可以使用 persist 和 cache 方法将任意 RDD 缓存到内存或者磁盘文件系统中。数据会优先存储到内存中,当内存不足以存放RDD中的数据的时候,就会持久化到磁盘上2.基于lineage的高效容错
3.task如果失败会特定次数的重试
4.stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
Stage对象可以记录并跟踪多个StageInfo,缺省的重试次数也是4次。且可以直接运行计算失败的阶段,值计算失败的数据分片。 Stage是Spark Job运行时均有相同逻辑功能和并行计算任务的一个基本单元。其中的所有任务都依赖同样的Shuffle,每个DAG任务都是通过DAGScheduler在Stage的边界处发生Shuffle形成Stage,然后DAGScheduler按照这些拓扑结构执行5.checkpoint和persist
6.数据调度弹性
DAGScheduler和TaskScheduler和资源无关。Spark将执行模型抽象为DAG,可以让多个Stage任务串联或者并行执行,而无需将中间结果写入到HDFS中。这样当某个节点故障的时候,可以由其他节点来执行出错的任务7.数据分片的高度弹性repartion
Spark进行数据分片的时候,默认将数据存放在内存中,当内存不够的时候,会将一部分存放到磁盘上。如经过分片后,某个Partition非常小,就可以合并多个小的partition来计算,而不用每个partition都起一个线程。这样就提高了效率,也不会因为大量的线程而导致OOM在Spark里面对RDD操作分为两类,一类叫作Transformation(转换),转换具有延迟执行(软加载)的特点。Transformation会记录元数据信息,当计算任务触发Action时才会真正开始计算。为什么要进行软加载?因为存在很多这种算子,有一个算子就触发计算,需要计算很多次。另一种叫作Action(动作)。
常用的Transformation:转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T]=>Iterator{…} |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(int,Iterator[T])=>iterator{…} |
sample(withReplacement,Fraction,seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(other Dataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(other Dataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks]) | 对源RDD进行去重后后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(k,v)的RDD上调用,返回一个(k,Iterator[V])的RDD |
reduceByKey(func [numTasks]) | 在一个(k,v)的RDD上调用,返回一个(k,v)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
sortByKey([ascending],[numTasks]) | 在一个(k,v)的RDD上调用,k必须实现Ordered接口,返回一个按照key进行排序的(k,v)的RDD |
sortBy(func,[ascending],[numTasks]) | 与sortByKey类似 |
join(otherDataset,[numTasks]) | 在类型为(k,v)和(k,w)的RDD上调用,返回一个相同key对应的所有元素在一起的(k,fv,w)的RDD |
cogroup(otherDataset,[numTasks]) | 在类型为(k,v)和(k,w)的RDD上调用,返回一个(k,iterable,iterable)类型的RDD |
aggregateByKey(zeroValue)(seqOp)([numTasks]) | |
coalesce(numPartitions) | |
repartition(numPartitions) |
aggregate(聚合):现对每个partition进行聚合,再对个部分的结果的进行聚合。定义的数组内容在Master上,完成计算是在Worker上。RDD会在Master机器上启动一个Driver进程,专门用来提交程序。
aggregateByKey:对相同key的value进行操作 combineByKey:传进去3个函数,第一个函数相对于key进行分组,分组之后对分组里面的第一个元素进行操作,第二个函数是局部进行操作,第三个是对所有partition进行操作。reduceByKey底层是由combineByKey实现的。 repartition:重新分区,假如数据原先在两个分区,运行这个命令重新分区为3个,数据要通过网络传输到另外的机器上。 Action:动作 | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num,[seed]) | 返回一个数组,该数组由从数据集中随机采样的num各元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n,[ordering]) | |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本 |
【RDD的依赖关系】
RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。窄依赖指每一个父RDD的Partition最多被子RDD的一个Partition使用,宽依赖指多个子RDD的Partition会依赖同一个父RDD的Partition。 【Lineage】 RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。【RDD的缓存以及缓存方式】
Spark速度非常快的原因之一是在不同操作中可以在内存中持久化或缓存数据集,当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中并在其他动作中重用,这使得后续的动作变得更加迅速。 RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中并供后面重用,其中cache调用persist方法。若数据大于可用内存,例如机器只有512M内存,HDFS中数据有1G,调用cache方法,会缓存512M数据,剩余数据还是在HDFS中。/**Persist this RDD with the default storage level('MEMORY_ONLY')def cache():this.type=persist()/**Persist this RDD with the default storage level('MEMORY_ONLY')def persist():this.type=persist(StorageLevel.MEMORY_ONLY)
RDD缓存可以对应不同的存储级别。
针对于下过程分析该计算中会产生多少RDD以及RDD之间的依赖关系。
val rdd=sc.textFile("hdfs://root:9000/app") .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) .saveAsTextFile(args(1))//产生6个RDD
Spark往HDFS中写数据,需要拿到HDFS一个流,然后往HDFS中写数据,如果是一条数据写一次,就会频繁打开、关闭一个流,效率会很低。可以拿到针对一个分区拿到一个流,把一个分区中的数据写到HDFS中。
【.textFile(“hdfs://root:9000/app”)】 会产生HadoopRDD、MapPartitionsRDD,因为通过Hadoop从HDFS中读数据,数据为K-V类型,Key是LongWritable,Value是caches,因为它用的InputFormat。之后发现偏移量没有用,把每一条数据进行处理(只保留Value),这时候需要经过一次map(RDD的Map),生成MapPartitionsRDD,把每个分区的数据拿出来,调用Scala的map方法 。【.flatMap(.split(" ")) 】和【.map((,1))】产生一个RDD:MapPartitionsRDD。【.reduceByKey(+)】将reduceByKey转换成pariRDDFunction,reduceByKey调用combineByKeyWithClassTag,这个函数要求传入三个参数【1】把每个分区的key拿出来处理【2】局部的分区进行聚合操作【3】全部的数据拿出来聚合操作。reduceByKey会产生一个ShuffledRDD,会产生一个局部聚和。【.saveAsTextFile(args(1))】saveAsTextFile往HDFS中写数据,调用MapPartitionsRDD方法,把一个分区的数据拿出来然后建立一个流写数据。
RDD中有很多分区,这些分区由多个Executor处理,每个Executor只处理一个或两个分区,一个分区肯定在一个Executor上。尽管可以通过cache或者persist将数据放到内存或者磁盘中,但是内存和磁盘都有可能坏掉,当进行下一步计算找不到数据了。由于RDD有容错机制,RDD和RDD会记录之间的依赖关系,即使以前的数据丢失了一部分,可以通过血统恢复,但是数据要经过几百次迭代,代价很大。这里可以把中间结果做一个checkpoint,把中间的结果保存到HDFS。
/***Mark this RDD for checkpointing.It will be saved to a file inside the checkpoint *directory set with 'SparkContextsetCheckpointDir' and all references to its parent*RDDs will be removed.This function must be called before any job has been*executed on this RDD.It is strongly recommended that this RDD is persisted in*memory,otherwise saving it on a file will require recomputation.**/
在CheckPoint之前一定要指定一个存储目录:sc.setcheckpointDir("…")
并且rdd.checkpoint是一个Transformation,在触发Action时会执行两个任务【1】计算Action任务【2】把数据保存到HDFS。在使用CheckPoint之前最好把数据缓存到内存中,速度会快很多,不用重新启动一个job,而是直接从内存当中把数据写入到HDFS中。之后的RDD在计算Transformation或Action时直接从CheckPoint中读取数据,不再依赖血统关系。RDD中的数据是什么时候写入的?是在RDD调用checkpoint方法时候吗?
RDD中checkpoint方法,在该方法中是只是新建了一个ReliableRDDCheckpointData的对象,并没有做实际的写入工作。实际触发写入的时机是在runJob生成该RDD后,调用RDD的doCheckpoint方法来做的。在做checkpoint的时候,具体写入了哪些数据到HDFS了?
在经历调用RDD.doCheckpoint → RDDCheckpointData.checkpoint →ReliableRDDCheckpointData.doCheckpoint →ReliableRDDCheckpointData.writeRDDToCheckpointDirectory后,在writeRDDToCheckpointDirectory方法中可以看到:将作为一个单独的任务(RunJob)将RDD中每个parition的数据依次写入到checkpoint目录(writePartitionToCheckpointFile),此外如果该RDD中的partitioner如果不为空,则也会将该对象序列化后存储到checkpoint目录。所以,在做checkpoint的时候,写入的hdfs中的数据主要包括:RDD中每个parition的实际数据,以及可能的partitioner对象(writePartitionerToCheckpointDir)。在对RDD做完checkpoint以后,对这个RDD的本身又做了哪些收尾工作?
在写完checkpoint数据到hdfs以后,将会调用rdd的markCheckpoined方法,主要斩断该rdd的对上游的依赖,以及将paritions置空等操作。实际过程中,使用RDD做checkpoint的时候需要注意什么问题?
在RDD计算完毕后,会再次通过RunJob将每个partition数据保存到HDFS。这样RDD将会计算两次,所以为了避免此类情况,最好将RDD进行cache。sc.setCheckpointDir(checkpointDir.toString)val rdd = sc.makeRDD(1 to 20, numSlices = 1)rdd.cache()rdd.checkpoint()
cache与persist
cache 能够让重复数据在同一个 application 中的 jobs 间共享。RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY。 cache与persist的唯一区别在于: cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据StorageLevel设置其它的缓存级别。cache或者persist并不是action。系统如何对RDD进行cache?
将要计算 RDD partition 的时候就去判断 partition 要不要被 cache。如果要被 cache 的话,先将 partition 计算出来,然后 cache 到内存。调用 rdd.cache() 后, RDD就变成 persistRDD ,其StorageLevel为MEMORY_ONLY。cached RDD 怎么被读取?
当要计算某个RDD中的partition时候,会先去 blockManager里面查找是否已经被cache了。如果partition被cache在本地,就直接使用blockManager.getLocal()去本地 memoryStore里读取。如果该 partition 被其他节点上 blockManager cache 了,会通过blockManager.getRemote() 去其他节点上读取。cache与checkpoint
即cache 和 checkpoint 的显著区别是:cache把 RDD 计算出来然后放在内存中, 但是RDD的依赖链也不能丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过依赖链重新计算出来;而 checkpoint 是把 RDD 保存在 HDFS中,会斩断血缘关系。 cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了,checkpoint是等到 job 结束后启动专门的 job 去完成 checkpoint ,即checkpoint 的 RDD 会被计算两次。因此,在使用 rdd.checkpoint() 的时候,建议加上 rdd.cache(),这样第二次运行的 job 就不用再去计算该 RDD了,直接读取 cache 写磁盘。persist与checkpoint
rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 区别的是:前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,即executor所在进程stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS,如果不被手动 remove 掉,是一直存在的,可以被下一个 driver program 使用。转载地址:http://dyhli.baihongyu.com/