㈠ 可能是全网最详细的 Spark Sql Aggregate 源码剖析
纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。
本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的
创建聚合分为两个阶段:
AggregateExpression 共有以下几种 mode:
Q:是否支持使用 hash based agg 是如何判断的?
摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd
为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为
这样就能进入 HashAggregateExec 的分支
构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化
在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:
可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。
此迭代器:
注:UnsafeKVExternalSorter 的实现可以参考:
UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:
使用 UnsafeRow 的收益:
构造函数的主要流程已在上图中说明,需要注意的是:当内存不液宴足时(毕竟每销慎个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来闹斗银看看最关键的 processInputs 方法的实现
上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:
上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类
AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。
DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:
我们再次以容易理解的 Count 来举例说明:
通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:
再来看看 AggregationIterator#processRow
AggregationIterator#processRow 会调用
生成用于处理一行数据(row)的函数
说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:
比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:
对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:
对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:
对于 Final 的 Collect 来说就是调用其 merge 方法,即:
我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。
分为两种情况,分别是:
Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~
㈡ 开源代码是什么意思
一句话来说,开源指的是那些源代码或源设计可以被大众使用、修改发行的软件或设计体。
大众最熟悉的开源软件就是安卓,相信用非苹果的智能手机用户,现在每天用的肯定都是安卓,它也是现在影响力最大的开源软件之一,如果没有安卓的开源开放,相信今天没有那么多手机厂商和移动互联网的兴起。
如果只是从生态的角度来说,苹果的生态也很开放,现在他们也推出了开源Swift。从这方面来说,其实两者都是一样的,只是开源、开放的方式方法不太一样。
Linux 无疑是开源软件里最最成功的一个,不管是从它目前的生态建设角度,还是从业界评价来看,包括今天云计算的基础也都倚赖Linux的贡献和基石。当然,像OpenStack、Hadoop 、Spark等也非常成功,这些开源项目都属于底层技术,在支撑今天整个大数据、云计算的发展。
开源并不意味着免费,开源只是说我们做了一个好东西,把它开放给大家使用,目的是希望大家更多地使用它,并反馈使用过程中的问题或者改进方式,使得整个开源项目进步得更快,能够更好地共享给更多有需要的人,目前像 Linux、Hadoop、Spark等等,都是这么做的。但很多时候开源背后还是带有很浓厚的商业背景。
做得比较大的开源项目背后都有商业公司在支撑,如果一个成功的开源项目背后没有商业公司,这是不健康的,我们需要开源和商业之间的互补对称来促进整个社区和技术的不断前进答。
㈢ 《深入理解SPARK核心思想与源码分析》epub下载在线阅读,求百度网盘云资源
《深入理解SPARK》(耿嘉安)电子书网盘下载免费在线阅读
资源链接:
链接:
书名:深入理解SPARK
作者:耿嘉安
豆瓣评分:7.2
出版社:机械工业出版社
出版年份:2016-1-1
页数:469
内容简介:
《深入理解SPARK:核心思想与源码分析》结合大量图和示例,对Spark的架构、部署模式和工作模块的设计理念、实现源码与使用技巧进行了深入的剖析与解读。
《深入理解SPARK:核心思想与源码分析》一书对Spark1.2.0版本的源代码进行了全面而深入的分析,旨在为Spark的优化、定制和扩展提供原理性的指导。阿里巴巴集团专家鼎力推荐、阿里巴巴资深Java开发和大数据专家撰写。
本书分为三篇:
准备篇(第1~2章),介绍了Spark的环境搭建、设计理念与基本架构,帮助读者了解一些背景知识。
核心设计篇(第3~7章),着重讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析。通过这部分的内容,读者可以通过源码剖析更加深入理解Spark的核心设计与实现,以便在实际使用中能够快速解决线上问题并对性能进行调优。
扩展篇(第8~11章),主要讲解基于Spark核心的各种扩展及应用,包括SQL处理引擎、Hive处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等内容。通过阅读这部分内容,读者可以扩展实际项目中对Spark的应用场景,让Spark焕发活力。
作者简介:
耿嘉安,10年IT行业相关经验。就职于阿里巴巴商家业务事业部,任资深Java工程师,专注于开源和大数据领域,目前与小伙伴们基于ODPS构建阿里的大数据商业解决方案——御膳房。在大量的工作实践中,对J2EE、JVM、Tomcat、Spring、Hadoop、Spark、MySQL、Redis都有深入研究,尤其喜欢剖析开源项目的源码实现。早期从事J2EE企业级应用开发,对Java相关技术有独到见解。业余时间喜欢研究中国古代历史,古诗词,旅游,足球等。
㈣ spark源码二次开发难吗
spark源码二次开发不难。掌握了源码编译,就具备了对Spark进行二次开发的基本条件了,要修改Spark源码,进行二次开发,那么就得从官网下载指定版本的源码,导入ide开发环境,进行源码的修改。接着修改完了。
㈤ 《深入理解spark核心思想及源码分析》pdf下载在线阅读全文,求百度网盘云资源
《深入理解spark核心思想及源码分析》网络网盘pdf最新全集下载:
链接:https://pan..com/s/1iOq9-MrepVdWcIrbALPMPg
㈥ Spark源码分析之SparkSubmit的流程
本文主要对SparkSubmit的任务提交流程源码进行分析。 Spark源码版本为2.3.1。
首先阅读一下启动脚本,看看首先加载的是哪个类,我们看一下 spark-submit 启动脚本中的具体内容。
可以看到这里加载的类是org.apache.spark.deploy.SparkSubmit,并且把启动相关的参数也带过去了。下面我们跟一下源码看看整个流程是如何运作的...
SparkSubmit的main方法如下
这里我们由于我们是提交作业,所有会走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先会准备任务提交的环境,调用了prepareSubmitEnvironment,该方法会返回四元组,该方法中会调用doPrepareSubmitEnvironment,这里我们重点注意 childMainClass类具体是什么 ,因为这里涉及到后面启动我们主类的过程。
以下是doPrepareSubmitEnvironment方法的源码...
可以看到该方法首先是解析相关的参数,如jar包,mainClass的全限定名,系统配置,校验一些参数,等等,之后的关键点就是根据我们 deploy-mode 参数来判断是如何运行我们的mainClass,这里主要是通过childMainClass这个参数来决定下一步首先启动哪个类。
childMainClass根据部署模型有不同的值:
之后该方法会把准备好的四元组返回,我们接着看之前的submit方法
可以看到这里最终会调用doRunMain()方法去进行下一步。
doRunMain的实现如下...
doRunMain方法中会判断是否需要一个代理用户,然后无论需不需要都会执行runMain方法,我们接下来看看runMain方法是如何实现的。
这里我们只假设以集群模式启动,首先会加载类,将我们的childMainClass加载为字节码对象mainClass ,然后将mainClass 映射成SparkApplication对象,因为我们以集群模式启动,那么上一步返回四元组中的childMainClass的参数为ClientApp的全限定名,而这里会调用app实例的start方法因此,这里最终调用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到这里和之前我们的master启动流程有些相似。
可以参考我上一篇文章 Spark源码分析之Master的启动流程 对这一流程加深理解。
首先是准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,因为这里运行start方法时会将之前配置的相关参数都传进来,之后就会通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信。
而在我上一篇文章中说过,只要是setupEndpoint方法被调用,一定会调用相关端点的的onStart方法,而这会调用clientEndPoint的onStart方法。
ClientEndPoint类中的onStart方法会匹配launch事件。源码如下
onStart中匹配我们的launch的过程,这个过程是启动driverWrapper的过程,可以看到上面源码中封装了mainClass ,该参数对应DriverWrapper类的全限定名,之后将mainClass封装到command中,然后封装到driverDescription中,向Master申请启动Driver。
这个过程会向Mster发送消息,是通过rpcEnv来实现发射消息的,而这里就涉及到outbox信箱,会调用postToOutbox方法,向outbox信箱中添加消息,然后通过TransportClient的send或sendRpc方法发送消息。发件箱以及发送过程是在同一个线程中进行。
而细心的同学会注意到这里调用的方法名为SendToMasterAndForwardReply,见名之意,发送消息到master并且期待回应。
下面是rpcEnv来实现向远端发送消息的一个调用流程,最终会通过netty中的TransportClient来写出。
之后,Master端会触发receiveAndReply函数,匹配RequestSubmitDriver样例类,完成模式匹配执行后续流程。
可以看到这里首先将Driver信息封装成DriverInfo,然后添加待调度列表waitingDrivers中,然后调用通用的schele函数。
由于waitingDrivers不为空,则会走LaunchDriver的流程,当前的application申请资源,这时会向worker发送消息,触发Worker的receive方法。
Worker的receive方法中,当Worker遇到LaunchDriver指令时,创建并启动一个DriverRunner,DriverRunner启动一个线程,异步的处理Driver启动工作。这里说启动的Driver就是刚才说的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是开辟线程异步的处理Driver启动工作,不会阻塞主进程的执行,而prepareAndRunDriver方法中最终调用 runDriver..
runDriver中主要先做了一些初始化工作,接着就开始启动driver了。
上述Driver启动工作主要分为以下几步:
下面我们直接看DriverWrapper的实现
DriverWrapper,会创建了一个RpcEndpoint与RpcEnv,RpcEndpoint为WorkerWatcher,主要目的为监控Worker节点是否正常,如果出现异常就直接退出,然后当前的ClassLoader加载userJar,同时执行userMainClass,在执行用户的main方法后关闭workerWatcher。
以上就是SparkSubmit的流程,下一篇我会对SparkContext的源码进行解析。
欢迎关注...