导航:首页 > 源码编译 > spark源码导读

spark源码导读

发布时间:2023-05-11 12:46:03

‘壹’ 《深入理解SPARK核心思想与源码分析》epub下载在线阅读,求百度网盘云资源

《深入理解SPARK》(耿嘉安)电子书网盘下载免费在线阅读

资源链接:

链接:

提取码:oeso

书名:深入理解SPARK

作者:耿嘉安

豆瓣评分:7.2

出版社:机械工业出版社

出版年份:2016-1-1

页数:469

内容简介:

《深入理解SPARK:核心思想与源码分析》结合大量图和示例,对Spark的架构、部署模式和工作模块的设计理念、实现源码与使用技巧进行了深入的剖析与解读。

《深入理解SPARK:核心思想与源码分析》一书对Spark1.2.0版本的源代码进行了全面而深入的分析,旨在为Spark的优化、定制和扩展提供原理性的指导。阿里巴巴集团专家鼎力推荐、阿里巴巴资深Java开发和大数据专家撰写。

本书分为三篇:

准备篇(第1~2章),介绍了Spark的环境搭建、设计理念与基本架构,帮助读者了解一些背景知识。

核心设计篇(第3~7章),着重讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析。通过这部分的内容,读者可以通过源码剖析更加深入理解Spark的核心设计与实现,以便在实际使用中能够快速解决线上问题并对性能进行调优。

扩展篇(第8~11章),主要讲解基于Spark核心的各种扩展及应用,包括SQL处理引擎、Hive处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等内容。通过阅读这部分内容,读者可以扩展实际项目中对Spark的应用场景,让Spark焕发活力。

作者简介:

耿嘉安,10年IT行业相关经验。就职于阿里巴巴商家业务事业部,任资深Java工程师,专注于开源和大数据领域,目前与小伙伴们基于ODPS构建阿里的大数据商业解决方案——御膳房。在大量的工作实践中,对J2EE、JVM、Tomcat、Spring、Hadoop、Spark、MySQL、Redis都有深入研究,尤其喜欢剖析开源项目的源码实现。早期从事J2EE企业级应用开发,对Java相关技术有独到见解。业余时间喜欢研究中国古代历史,古诗词,旅游,足球等。

‘贰’ 怎么在eclipse里正确导入spark2.0.0 的源码

应该说这个和是不是Spark项目没什么关系。

建议你使用intellij idea,在spark目录下执行"sbt/sbt gen-idea",会自动生成.idea项目,导入即可。
idea我不熟,还需要做一些其他的插件配置(python, sbt等)和环境设置。

你也可以使用Eclipse看,Eclipse有scala IDE,把Spark项目当maven工程导入。但是子项目之间的依赖会有点问题,会报错。

推荐使用前者,向Databricks的开发者看齐;我使用的是后者,我直接依赖了编译好的包就不会报错了,纯读源码的话也勉强可以跟踪和调试。

另外,我也看有的Committer用vim看spark

‘叁’ spark1.6的源码怎么导入idea或者eclipseIDE

下载源颂拿坦码导入 点击file->open 选野桐择以及下载好并解敏陪压过的spark-1.6.0包 点击ok,idea会自动安装下载文件 完成后 打开源码!

‘肆’ Spark Sql 源码剖析(二): TreeNode

使用 object CurrentOrigin 为 TreeNodes 提供一个可以查找上下文的地方,比如当前正在解析哪行 code。

object CurrentOrigin 主燃颤含要包含一个 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 仅在 parser 中使用,在 visit 每个节点的时候都会使用,记录当前 parse 的节点是哪行哪列

另外,从 value 是 ThreadLocal 类型可以看出,在 Spark SQL 中,parse sql 时都是在单独的 thread 里进行的(不同的 sql 不同的 thread)

返回该节点的 seq of children,children 是不可变的。有三种情况:

查找第一个符合 f 条件(比如某个类型的)的 TreeNode,先序遍历。

将函数 f 递归应用洞猛于节点及其子节点

与 foreach 不同的是,foreach 先应用于 parent,再应用与 child;而 foreachUp 是先应用于 child 再应用与 parent

调用 foreach,foreach 中应用的函数是 ret += f(_) ,最终返回一个 seq,包含将 f 通过 foreach 方式应用于所有节点并 add 到 ret。其中 f 本身是 BaseType => A 类型

原理与 map 一致,只是 f 变成了 BaseType => TraversableOnce[A]

PartialFunction#lift :将 partial func 转换为一个返回 Option 结果的函数。将 pf 函数应用于符合 pf 定义的节点(即 pf.lift(node)返回的 Option 不是 None )并都皮笑 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回

以 Seq 的形式返回 tree 的所有叶子节点

def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因为可能没有符合 pf 定义的节点,所有返回的 Option 可能是 None

相当于 proctIterator.map(f).toArray ,即对于 proctIterator 每个元素执行 f 然后将 ret 组成一个 arr 返回

注意:TreeNode 没有实现 Proct 相关方法,都由其子类自行实现

使用 new children 替换并返回该节点的拷贝。该方法会对 proctElement 每个元素进行模式匹配,根据节点类型及一定规则进行替换。

调用 transformDown

rule: PartialFunction[BaseType, BaseType]

返回 f 应用于所有子节点(非递归,一般将递归操作放在调用该函数的地方)后该节点的 。其内部的原理是调用 mapProctIterator,对每一个 proctElement(i) 进行各种模式匹配,若能匹配上某个再根据一定规则进行转换,核心匹配转换如下:

以上都是适用于有 children 的 node,如果是 children 为 null 的 node 直接返回

反射生成节点副本

返回该类型 TreeNode 的 name,默认为 class name;注意,会移除物理操作的 Exec$ 前缀

所有应该以该节点内嵌套树表示的 nodes,比如,可以被用来表示 sub-queries

(children ++ innerChildren).toSet[TreeNode[_]]

主要用于交互式 debug,返回该 tree 指定下标的节点,num 可以在 numberedTreeString 找到。最终调用的

我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi

‘伍’ 可能是全网最详细的 Spark Sql Aggregate 源码剖析

纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。

本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的

创建聚合分为两个阶段:

AggregateExpression 共有以下几种 mode:

Q:是否支持使用 hash based agg 是如何判断的?

摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd

为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为

这样就能进入 HashAggregateExec 的分支

构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化

在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:

可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。

此迭代器:

注:UnsafeKVExternalSorter 的实现可以参考:

UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:

使用 UnsafeRow 的收益:

构造函数的主要流程已在上图中说明,需要注意的是:当内存不液宴足时(毕竟每销慎个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来闹斗银看看最关键的 processInputs 方法的实现

上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:

上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类

AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。

DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:

我们再次以容易理解的 Count 来举例说明:

通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:

再来看看 AggregationIterator#processRow

AggregationIterator#processRow 会调用

生成用于处理一行数据(row)的函数

说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:

比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:

对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:

对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:

对于 Final 的 Collect 来说就是调用其 merge 方法,即:

我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。

分为两种情况,分别是:

Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~

‘陆’ 源码级解读如何解决Spark-sql读取hive分区表执行效率低问题

问题描述

在开发过程中使用spark去读取hive分区表的过程中(或者使用hive on spark、nodepad开发工具),部分开发人员未注意添加分区属性过滤导致在执行过程中加载了全量数据,引起任务执行效率低、首腊磁盘IO大量禅芹键损耗等问题。

解决办法

1、自定义规则CheckPartitionTable类,实现Rule,通过以下方式创建SparkSession。

2、自定义规则CheckPartitionTable类,实现Rule,将规则类追加至Optimizer.batches: Seq[Batch]中,如下。

规则内容实现

1、CheckPartitionTable规则执行类,需要通过引入sparkSession从而获取到引入conf;需要继承Rule[LogicalPlan];

2、通过splitPredicates方法,分离分区谓词,得到分区贺巧谓词表达式。在sql解析过程中将谓词解析为TreeNode,此处采用递归的方式获取分区谓词。

3、判断是否是分区表,且是否添加分区字段。

4、实现Rule的apply方法

大数据和云计算的关系

大数据JUC面试题

大数据之Kafka集群部署

大数据logstsh架构

大数据技术kafka的零拷贝

‘柒’ Spark源码分析之SparkSubmit的流程

本文主要对SparkSubmit的任务提交流程源码进行分析。 Spark源码版本为2.3.1。

首先阅读一下启动脚本,看看首先加载的是哪个类,我们看一下 spark-submit 启动脚本中的具体内容。

可以看到这里加载的类是org.apache.spark.deploy.SparkSubmit,并且把启动相关的参数也带过去了。下面我们跟一下源码看看整个流程是如何运作的...

SparkSubmit的main方法如下

这里我们由于我们是提交作业,所有会走上面的submit(appArgs, uninitLog)方法

可以看到submit方法首先会准备任务提交的环境,调用了prepareSubmitEnvironment,该方法会返回四元组,该方法中会调用doPrepareSubmitEnvironment,这里我们重点注意 childMainClass类具体是什么 ,因为这里涉及到后面启动我们主类的过程。

以下是doPrepareSubmitEnvironment方法的源码...

可以看到该方法首先是解析相关的参数,如jar包,mainClass的全限定名,系统配置,校验一些参数,等等,之后的关键点就是根据我们 deploy-mode 参数来判断是如何运行我们的mainClass,这里主要是通过childMainClass这个参数来决定下一步首先启动哪个类。

childMainClass根据部署模型有不同的值:

之后该方法会把准备好的四元组返回,我们接着看之前的submit方法

可以看到这里最终会调用doRunMain()方法去进行下一步。

doRunMain的实现如下...

doRunMain方法中会判断是否需要一个代理用户,然后无论需不需要都会执行runMain方法,我们接下来看看runMain方法是如何实现的。

这里我们只假设以集群模式启动,首先会加载类,将我们的childMainClass加载为字节码对象mainClass ,然后将mainClass 映射成SparkApplication对象,因为我们以集群模式启动,那么上一步返回四元组中的childMainClass的参数为ClientApp的全限定名,而这里会调用app实例的start方法因此,这里最终调用的是ClientApp的start方法。

ClientApp的start方法如下...

可以看到这里和之前我们的master启动流程有些相似。
可以参考我上一篇文章 Spark源码分析之Master的启动流程 对这一流程加深理解。

首先是准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,因为这里运行start方法时会将之前配置的相关参数都传进来,之后就会通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信。

而在我上一篇文章中说过,只要是setupEndpoint方法被调用,一定会调用相关端点的的onStart方法,而这会调用clientEndPoint的onStart方法。

ClientEndPoint类中的onStart方法会匹配launch事件。源码如下

onStart中匹配我们的launch的过程,这个过程是启动driverWrapper的过程,可以看到上面源码中封装了mainClass ,该参数对应DriverWrapper类的全限定名,之后将mainClass封装到command中,然后封装到driverDescription中,向Master申请启动Driver。

这个过程会向Mster发送消息,是通过rpcEnv来实现发射消息的,而这里就涉及到outbox信箱,会调用postToOutbox方法,向outbox信箱中添加消息,然后通过TransportClient的send或sendRpc方法发送消息。发件箱以及发送过程是在同一个线程中进行。

而细心的同学会注意到这里调用的方法名为SendToMasterAndForwardReply,见名之意,发送消息到master并且期待回应。

下面是rpcEnv来实现向远端发送消息的一个调用流程,最终会通过netty中的TransportClient来写出。

之后,Master端会触发receiveAndReply函数,匹配RequestSubmitDriver样例类,完成模式匹配执行后续流程。

可以看到这里首先将Driver信息封装成DriverInfo,然后添加待调度列表waitingDrivers中,然后调用通用的schele函数。

由于waitingDrivers不为空,则会走LaunchDriver的流程,当前的application申请资源,这时会向worker发送消息,触发Worker的receive方法。

Worker的receive方法中,当Worker遇到LaunchDriver指令时,创建并启动一个DriverRunner,DriverRunner启动一个线程,异步的处理Driver启动工作。这里说启动的Driver就是刚才说的org.apache.spark.deploy.worker.DriverWrapper

可以看到上面在DriverRunner中是开辟线程异步的处理Driver启动工作,不会阻塞主进程的执行,而prepareAndRunDriver方法中最终调用 runDriver..

runDriver中主要先做了一些初始化工作,接着就开始启动driver了。

上述Driver启动工作主要分为以下几步:

下面我们直接看DriverWrapper的实现

DriverWrapper,会创建了一个RpcEndpoint与RpcEnv,RpcEndpoint为WorkerWatcher,主要目的为监控Worker节点是否正常,如果出现异常就直接退出,然后当前的ClassLoader加载userJar,同时执行userMainClass,在执行用户的main方法后关闭workerWatcher。

以上就是SparkSubmit的流程,下一篇我会对SparkContext的源码进行解析。

欢迎关注...

‘捌’ 怎么用Eclipse搭建Spark源码阅读环境

第一部分、软件安装

1、 安装JDK (版本为1.7.0_11)

2、 安装Scala (版本为2.11.2)

3、 安装ScalaIDE(版本为3.0.4)

第二部分:加压缩官网下载的源代码包或者找到通过Git抽取的Spark源文件:

我用的是spark-1.1.1版本(最新版本),由于idea 13已经原生支持sbt,所以无须为idea安装sbt插件。

源码下载(用git工具):

# Masterdevelopment branch

gitclone git://github.com/apache/spark.git

# 1.1 maintenancebranch with stability fixes on top of Spark 1.1.1

gitclone git://github.com/apache/spark.git -b branch-1.1

源码更新(用git工具同步跟新源码):

gitclone https://github.com/apache/spark.git

第三部分:通过sbt工具,构建Scala的Eclipse工程,详细步骤如下所示

1、通过cmd命令进入DOS界面,之后通过cd命令进入源代码项目中,我下载的Spark.1.1.1版本的源代码放在(E:\Spark计算框架的研究\spark_1_1_1_eclipse)文件夹中,之后运行sbt命令,如下所示:

2、运行sbt命令之后,解析编译相关的jar包,并出现sbt命令界面窗口,出现的效果图如下所示,之后运行eclipse命令,sbt对这个工程进行编译,构建Eclipse项目,效果图如下所示:

4、 打开ScalaIDE工具,File à Import à Existing Projects into Workspace à
Next à
选择刚好用sbt工具编译好的Eclipse工程(E:\Spark计算框架的研究\spark_1_1_1_eclipse),如下图所示。

5、 通过上面的操作,就可以将通过sbt工具编译生成的Eclipse项目导入到EclipseIDE开发环境中,效果图如下所示:

错误提示如下所示:我导入的包为,如下文件夹中所示。

(E:\Spark计算框架的研究\spark_1_1_1_eclipse\lib_managed\bundles)

Description Resource Path Location Type

akka-remote_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled

in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-slf4j_2.10-2.2.3-shaded-protobuf.jar is cross-compiled with

an incompatible version of Scala (2.10). In case of errorneous report,

this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-testkit_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-zeromq_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

上面这些包兼容性问题还没有解决,修改相应的jar包就可以解决。

‘玖’ spark源码的checkpoint部分在哪

Spark源码是有Scala语言写好尘成的,目前,IDEA对Scala的支持要比eclipse要好,大多数人会选在在IDEA上完成Spark平台应用的开发。因此神团,Spark源码阅读的IDE理友瞎禅所当然的选择了IDEA。

‘拾’ spark sql 2.3 源码解读 - Execute (7)

终于到了最后一步执行了:

最关键的两个函数便是 doPrepare和 doExecute了。

还是以上一章的sql语句为例,其最终生成的sparkplan为:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看没有exchangeCoordinator的情况,

首先执行:

上面的方法会返回一个ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它决定了每一条InternalRow shuffle后的partition id:

接下来:

返回结果是ShuffledRowRDD:

CoalescedPartitioner的逻辑:

再看有exchangeCoordinator的情况:

同样返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函数得到了 partitionStartIndices:

有exchangeCoordinator的情况就生成了partitionStartIndices,从而对分区进行了调整。

最后来一个例子:

未开启exchangeCoordinator的plan:

开启exchangeCoordinator的plan:

不同之处是 两个Exchange都带了coordinator,且都是同一个coordinator。

执行withExchangeCoordinator前:

执行withExchangeCoordinator后:

生成了coordinator,且执行了 doPrepare后,可以看到两个exchange都向其注册了。

doExecute后:

原先的numPartitions是200,经过执行后,生成的partitionStartIndices为[1],也就是只有1个partition,显然在测试数据量很小的情况下,1个partition是更为合理的。这就是ExchangeCoordinator的功劳。

execute 最终的输出是rdd,剩下的结果便是spark对rdd的运算了。其实 spark sql 最终的目标便也是生成rdd,交给spark core来运算。

spark sql的介绍到这里就结束了。

阅读全文

与spark源码导读相关的资料

热点内容
图解政治pdf 浏览:162
自制可编程计算机 浏览:132
每个文件夹放入一张图片 浏览:453
cgzip压缩文件夹 浏览:591
如何找下载的app 浏览:470
程序员之死最新进展 浏览:261
台电u盘可以下载加密吗 浏览:697
java培训机构上海 浏览:616
长城服务器怎么装机 浏览:355
pm和hm哪个软件编程好 浏览:205
咋样把文件夹强力去除 浏览:989
jvmgc算法 浏览:599
我的世界怎样控制命令方块发文字 浏览:863
期货翻番计算法 浏览:495
华为底部app如何增加 浏览:115
为什么用了几个星期的安卓充电器就坏了 浏览:358
服务器风扇响怎么回事 浏览:16
django打包编译 浏览:196
u盘加密码视频 浏览:165
cmake创建文件夹 浏览:935