sharepoint做网站,做app必须有网站,企业商城网站建设价格,网络营销的方法包括哪些相比MapReduce僵化的Map与Reduce分阶段计算#xff0c;Spark计算框架更有弹性和灵活性#xff0c;运行性能更佳。
1 Spark的计算阶段
MapReduce一个应用一次只运行一个map和一个reduceSpark可根据应用复杂度#xff0c;分割成更多的计算阶段#xff08;stage#xff09;…相比MapReduce僵化的Map与Reduce分阶段计算Spark计算框架更有弹性和灵活性运行性能更佳。
1 Spark的计算阶段
MapReduce一个应用一次只运行一个map和一个reduceSpark可根据应用复杂度分割成更多的计算阶段stage组成一个DAGSpark任务调度器可根据DAG依赖关系执行计算阶段
逻辑回归机器学习性能Spark比MapReduce快100多倍。因某些机器学习算法可能需大量迭代计算产生数万个计算阶段这些计算阶段在一个应用中处理完成而不像MapReduce需要启动数万个应用因此运行效率极高。
DAG不同阶段的依赖关系有向计算过程只能沿依赖关系方向执行被依赖的阶段执行完成前依赖的阶段不能开始执行。该依赖关系不能有环形依赖否则就死循环。
典型的Spark运行DAG的不同阶段 整个应用被切分成3个阶段阶段3依赖阶段1、2阶段1、2互不依赖。Spark执行调度时先执行阶段1、2完成后再执行阶段3。对应Spark伪代码
rddB rddA.groupBy(key)
rddD rddC.map(func)
rddF rddD.union(rddE)
rddG rddB.join(rddF)所以Spark作业调度执行核心是DAG整个应用被切分成数个阶段每个阶段的依赖关系也很清楚。根据每个阶段要处理的数据量生成任务集合TaskSet每个任务都分配一个任务进程去处理Spark就实现大数据分布式计算。
负责Spark应用DAG生成和管理的组件是DAGScheduler
DAGScheduler根据程序代码生成DAG然后将程序分发到分布式计算集群按计算阶段的先后关系调度执行
Spark划分计算阶段的依据
显然并非RDD上的每个转换函数都会生成一个计算阶段如上4个转换函数但只有3个阶段。
观察上面DAG图计算阶段的划分就看出当RDD之间的转换连接线呈现多对多交叉连接就产生新阶段。一个RDD代表一个数据集图中每个RDD里面都包含多个小块每个小块代表RDD的一个分片。
一个数据集中的多个数据分片需进行分区传输写到另一个数据集的不同分片这种数据分区交叉传输操作在MapReduce运行过程也看过。 这就是shuffle过程Spark也要通过shuffle将数据重组相同Key的数据放在一起进行聚合、关联等操作因而每次shuffle都产生新的计算阶段。这也是为什么计算阶段会有依赖关系它需要的数据来源于前面一个或多个计算阶段产生的数据必须等待前面的阶段执行完毕才能进行shuffle并得到数据。
计算阶段划分依据是shuffle而非转换函数的类型有的函数有时有shuffle有时无。如上图例子中RDD B和RDD F进行join得到RDD G这里的RDD F需要进行shuffleRDD B不需要。 因为RDD B在前面一个阶段阶段1的shuffle过程中已进行数据分区。分区数目和分区K不变无需再shuffle 这种无需进行shuffle的依赖在Spark里称窄依赖需进行shuffle的依赖称宽依赖
类似MapReduceshuffle对Spark也重要只有通过shuffle相关数据才能互相计算。
既然都要shuffle为何Spark更高效
本质Spark算一种MapReduce计算模型的不同实现。Hadoop MapReduce简单粗暴根据shuffle将大数据计算分成Map、Reduce两阶段就完事。但Spark更细将前一个的Reduce和后一个的Map连接当作一个阶段持续计算形成一个更优雅、高效地计算模型其本质依然是Map、Reduce。但这种多个计算阶段依赖执行的方案可有效减少对HDFS的访问减少作业的调度执行次数因此执行速度更快。
不同于Hadoop MapReduce主要使用磁盘存储shuffle过程中的数据Spark优先使用内存进行数据存储包括RDD数据。除非内存不够用否则尽可能使用内存 这即Spark比Hadoop性能高。
2 Spark作业管理
Spark里面的RDD函数有两种
转换函数调用后得到的还是RDDRDD计算逻辑主要通过转换函数action函数调用后不再返回RDD。如count()函数返回RDD中数据的元素个数saveAsTextFile(path)将RDD数据存储到path路径
Spark的DAGScheduler遇到shuffle时会生成一个计算阶段在遇到action函数时会生成一个作业job。
RDD里面的每个数据分片Spark都会创建一个计算任务去处理所以一个计算阶段含多个计算任务task。
作业、计算阶段、任务的依赖和时间先后关系 横轴时间纵轴任务。两条粗黑线之间是一个作业两条细线之间是一个计算阶段。一个作业至少包含一个计算阶段。水平方向红色的线是任务每个阶段由很多个任务组成这些任务组成一个任务集合。
DAGScheduler根据代码生成DAG图后Spark任务调度就以任务为单位进行分配将任务分配到分布式集群的不同机器上执行。
3 Spark执行流程
Spark支持Standalone、Yarn、Mesos、K8s等多种部署方案原理类似仅不同组件的角色命名不同。
3.1 Spark cluster components Spark应用程序启动在自己的JVM进程里Driver进程启动后调用SparkContext初始化执行配置和输入数据。SparkContext启动DAGScheduler构造执行的DAG图切分成最小的执行单位-计算任务。
然后Driver向Cluster Manager请求计算资源用于DAG的分布式计算。Cluster Manager收到请求后将Driver的主机地址等信息通知给集群的所有计算节点Worker。
Worker收到信息后根据Driver的主机地址跟Driver通信并注册然后根据自己的空闲资源向Driver通报自己可以领用的任务数。Driver根据DAG图开始向注册的Worker分配任务。
Worker收到任务后启动Executor进程执行任务。Executor先检查自己是否有Driver的执行代码若无从Driver下载执行代码通过Java反射加载后开始执行。
4 Spark V.S Hadoop
4.1 个体对比 4.2 生态圈对比 4.3 MapReduce V.S Spark 4.4 优势 4.5 Spark 和 Hadoop 协作 5 总结
相比MapreduceSpark的主要特性
RDD编程模型更简单DAG切分的多阶段计算过程更快使用内存存储中间计算结果更高效
Spark在2012开始流行那时内存容量提升和成本降低已经比MapReduce出现的十年前强了一个数量级Spark优先使用内存的条件已成熟。
本文描述的内存模型自 Apache Spark 1.6 开始弃用新的内存模型基于 UnifiedMemoryManager并在这篇文章中描述。
在最近的时间里我在 StackOverflow 上回答了一系列与 ApacheSpark 架构有关的问题。所有这些问题似乎都是因为互联网上缺少一份关于 Spark 架构的好的通用描述造成的。即使是官方指南也没有太多细节当然也缺乏好的图表。《学习 Spark》这本书和官方研讨会的资料也是如此。
在这篇文章中我将尝试解决这个问题提供一个关于 Spark 架构的一站式指南以及对其一些最受欢迎的概念问题的解答。这篇文章并不适合完全的初学者——它不会为你提供关于 Spark 主要编程抽象RDD 和 DAG的洞见但是它要求你有这些知识作为先决条件。
从 http://spark.apache.org/docs/1.3.0/cluster-overview.html 上可用的官方图片开始
Spark 架构官方 如你所见它同时引入了许多术语——“executor”“task”“cache”“Worker Node”等等。当我开始学习 Spark 概念的时候这几乎是互联网上唯一关于 Spark 架构的图片现在情况也没有太大改变。我个人不是很喜欢这个因为它没有显示一些重要的概念或者显示得不是最佳方式。
让我们从头说起。任何任何在你的集群或本地机器上运行的 Spark 过程都是一个 JVM 过程。与任何 JVM 过程一样你可以用 -Xmx 和 -Xms JVM 标志来配置它的堆大小。这个过程如何使用它的堆内存以及它为什么需要它以下是 JVM 堆内的 Spark 内存分配图表 默认情况下Spark 以 512MB JVM 堆启动。为了安全起见避免 OOM 错误Spark 只允许使用堆的 90%这由参数 spark.storage.safetyFraction 控制。好的正如你可能已经听说 Spark 是一个内存中的工具Spark 允许你将一些数据存储在内存中。如果你读过我这里的文章 https://0x0fff.com/spark-misconceptions/你应该理解 Spark 并不是真的内存工具它只是利用内存来缓存 LRUhttp://en.wikipedia.org/wiki/Cache_algorithms。所以一些内存是为你处理的数据缓存而保留的部分这部分通常是安全堆的 60%由 spark.storage.memoryFraction 参数控制。所以如果你想知道你可以在 Spark 中缓存多少数据你应该取所有执行器的堆大小之和乘以 safetyFraction 和 storage.memoryFraction默认情况下它是 0.9 * 0.6 0.54 或者让 Spark 使用的总的堆大小的 54%。
现在更详细地了解 shuffle 内存。它的计算方法为 “堆大小” * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction。spark.shuffle.safetyFraction 的默认值是 0.8 或 80%spark.shuffle.memoryFraction 的默认值是 0.2 或 20%。所以最终你可以使用最多 0.8*0.2 0.16 或 JVM 堆的 16% 用于 shuffle。但是 Spark 如何使用这些内存呢你可以在这里获取更多细节https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala但总的来说Spark 用这些内存进行它的 Shuffle。当 Shuffle 进行时有时你也需要对数据进行排序。当你排序数据时你通常需要一个缓冲区来存储排序后的数据记住你不能就地修改 LRU 缓存中的数据因为它是用来稍后重用的。所以它需要一些 RAM 来存储排序的数据块。如果你没有足够的内存来排序数据会怎样有一系列通常被称为“外部排序”的算法http://en.wikipedia.org/wiki/External_sorting允许你进行分块数据的排序然后再将最终结果合并起来。
我还没涵盖的 RAM 的最后部分是“unroll”内存。被 unroll 过程使用的 RAM 部分是 spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction默认值等于 0.2 * 0.6 * 0.9 0.108 或者堆的 10.8%。这是当你将数据块 unroll 到内存时可以使用的内存。为什么你需要 unroll 它呢Spark 允许你以序列化和非序列化形式存储数据。序列化形式的数据不能直接使用因此你需要在使用之前 unroll 它所以这是用于 unroll 的 RAM。它与存储 RAM 共享这意味着如果你需要一些内存来 unroll 数据这可能会导致 Spark LRU 缓存中存储的一些分区被删除。
这很好因为此刻你知道了什么是 Spark 过程以及它如何利用它的 JVM 过程的内存。现在让我们转到集群模式——当你启动一个 Spark 集群时它实际上是什么样的呢我喜欢 YARN所以我将讲述它在 YARN 上是如何工作的但是总的来说对于任何你使用的集群管理器来说都是一样的
在 YARN 上的 Spark 架构 当你有一个 YARN 集群时它有一个 YARN Resource Manager 守护进程控制集群资源实际上是内存以及在集群节点上运行的一系列 YARN Node Managers控制节点资源利用率。从 YARN 的角度来看每个节点代表你有控制权的 RAM 池。当你向 YARN Resource Manager 请求一些资源时它会给你提供你可以联系哪些 Node Managers 为你启动执行容器的信息。每个执行容器是一个具有请求堆大小的 JVM。JVM 位置由 YARN Resource Manager 选择你无法控制它——如果节点有 64GB 的 RAM 被 YARN 控制yarn-site.xml 中的 yarn.nodemanager.resource.memory-mb 设置并且你请求 10 个执行器每个执行器 4GB它们所有的都可以容易地在一个 YARN 节点上启动即使你有一个大集群。
当你在 YARN 之上启动 Spark 集群时你指定了你需要的执行器数量–num-executors 标志或 spark.executor.instances 参数、每个执行器使用的内存量–executor-memory 标志或 spark.executor.memory 参数、每个执行器允许使用的核心数量–executor-cores 标志或 spark.executor.cores 参数以及为每个任务的执行专用的核心数量spark.task.cpus 参数。同时你还指定了驱动程序应用程序使用的内存量–driver-memory 标志或 spark.driver.memory 参数。
当你在集群上执行某事时你的工作处理被分割成阶段每个阶段又被分割成任务。每个任务分别被调度。你可以将每个作为执行者工作的 JVM 视为一个任务执行槽池每个执行者会给你 spark.executor.cores / spark.task.cpus 执行槽供你的任务使用总共有 spark.executor.instances 执行器。这是一个例子。有 12 个节点运行 YARN Node Managers 的集群每个节点 64GB 的 RAM 和 32 个 CPU 核心16 个物理核心与超线程。这样在每个节点上你可以启动 2 个执行器每个执行器 26GB 的 RAM为系统进程、YARN NM 和 DataNode 留下一些 RAM每个执行器有 12 个核心用于任务为系统进程、YARN NM 和 DataNode 留下一些核心。所以总的来说你的集群可以处理 12 台机器 * 每台机器 2 个执行器 * 每个执行器 12 个核心 / 每个任务 1 个核心 288 个任务槽。这意味着你的 Spark 集群将能够并行运行多达 288 个任务从而利用你在这个集群上拥有的几乎所有资源。你可以在这个集群上缓存数据的内存量是 0.9 * spark.storage.safetyFraction * 0.6 * spark.storage.memoryFraction * 12 台机器 * 每台机器 2 个执行器 * 每个执行器 26 GB 336.96 GB。不算太多但在大多数情况下它是足够的。
到目前为止效果很好现在你知道了 Spark 如何使用它的 JVM 的内存以及你在集群上有哪些执行槽。正如你可能已经注意到的我没有详细介绍“任务”究竟是什么。这将是下一篇文章的主题但基本上它是 Spark 执行的一个单一工作单元并作为 线程* 在执行器 JVM 中执行。这是 Spark 低作业启动时间的秘诀——在 JVM 中启动额外的线程比启动整个 JVM 快得多而后者是在 Hadoop 中开始 MapReduce 作业时执行的。
现在让我们关注另一个叫做“partition”的 Spark 抽象。你在 Spark 中工作的所有数据都被分割成分区。一个单一的分区是什么它是如何确定的分区大小完全取决于你使用的数据源。对于大多数在 Spark 中读取数据的方法你可以指定你想要在你的 RDD 中有多少分区。当你从 HDFS 读取一个文件时你使用的是 Hadoop 的 InputFormat 来做到这一点。默认情况下InputFormat 返回的每个输入分割都映射到 RDD 中的单个分区。对于 HDFS 上的大多数文件每个输入分割生成一个对应于 HDFS 上存储的一个数据块的数据大约是 64MB 或 128MB 的数据。大约因为在 HDFS 中数据是按照字节的确切块边界分割的但是在处理时它是按照记录分割分割的。对于文本文件分割字符是换行符对于序列文件是块末等等。这个规则的唯一例外是压缩文件——如果你有整个文本文件被压缩那么它不能被分割成记录整个文件将成为一个单一的输入分割从而在 Spark 中成为一个单一的分区你必须手动重新分区它。
现在我们所拥有的真的很简单——为了处理一个单独的数据分区Spark 生成一个单一任务这个任务在靠近你拥有的数据的位置Hadoop 块位置Spark 缓存的分区位置的任务槽中执行。 参考 https://spark.apache.org/docs/3.2.1/cluster-overview.htmlshuffle可以在这里找到新内存管理模型可以在这里找到