❶ 怎么编译spark的源代码 windows
1.maven的安装
到maven官网下载maven,maven依赖于jdk,所以胡烂先确保以安装jdk( http://maven.apache.org/download.cgi )
解压到一个目裤肢漏录下,我选择的是D:\learning_soft\maven
然后配置环境变量
%M2_HOME%=D:\learning_soft\maven\apache-maven-3.3.9
添加到path路径 %M2_HOME%bin
验证安装成功
修改maven的内存配置
在mvn.cmd或者mvn.bat中找到:
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 11
在其后添加
-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m 11
2.spark编译
到spark官网下载spark的源码饥仔,这里选择的spark-1.3.1
,解压文件后,修改pom.xml文件
<java.version>1.7</java.version>
<hadoop.version>2.4.1</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<hbase.version>0.98.9-hadoop2</hbase.version>
<zookeeper.version>3.4.6</zookeeper.version>
<derby.version>10.11.1.1</derby.version>123456123456
主要是指定hadoop,hbase的版本
然后在将目录切换到spark的目录下
输入
mvn -Pyarn -Dhadoop.version=2.4.1 -Dyarn.version=2.4.1 -DskipTests clean package
❷ Spark Sql 源码剖析(二): TreeNode
使用 object CurrentOrigin 为 TreeNodes 提供一个可以查找上下文的地方,比如当前正在解析哪行 code。
object CurrentOrigin 主燃颤含要包含一个 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 仅在 parser 中使用,在 visit 每个节点的时候都会使用,记录当前 parse 的节点是哪行哪列
另外,从 value 是 ThreadLocal 类型可以看出,在 Spark SQL 中,parse sql 时都是在单独的 thread 里进行的(不同的 sql 不同的 thread)
返回该节点的 seq of children,children 是不可变的。有三种情况:
查找第一个符合 f 条件(比如某个类型的)的 TreeNode,先序遍历。
将函数 f 递归应用洞猛于节点及其子节点
与 foreach 不同的是,foreach 先应用于 parent,再应用与 child;而 foreachUp 是先应用于 child 再应用与 parent
调用 foreach,foreach 中应用的函数是 ret += f(_) ,最终返回一个 seq,包含将 f 通过 foreach 方式应用于所有节点并 add 到 ret。其中 f 本身是 BaseType => A 类型
原理与 map 一致,只是 f 变成了 BaseType => TraversableOnce[A]
PartialFunction#lift :将 partial func 转换为一个返回 Option 结果的函数。将 pf 函数应用于符合 pf 定义的节点(即 pf.lift(node)返回的 Option 不是 None )并都皮笑 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回
以 Seq 的形式返回 tree 的所有叶子节点
def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因为可能没有符合 pf 定义的节点,所有返回的 Option 可能是 None
相当于 proctIterator.map(f).toArray ,即对于 proctIterator 每个元素执行 f 然后将 ret 组成一个 arr 返回
注意:TreeNode 没有实现 Proct 相关方法,都由其子类自行实现
使用 new children 替换并返回该节点的拷贝。该方法会对 proctElement 每个元素进行模式匹配,根据节点类型及一定规则进行替换。
调用 transformDown
rule: PartialFunction[BaseType, BaseType]
返回 f 应用于所有子节点(非递归,一般将递归操作放在调用该函数的地方)后该节点的 。其内部的原理是调用 mapProctIterator,对每一个 proctElement(i) 进行各种模式匹配,若能匹配上某个再根据一定规则进行转换,核心匹配转换如下:
以上都是适用于有 children 的 node,如果是 children 为 null 的 node 直接返回
反射生成节点副本
返回该类型 TreeNode 的 name,默认为 class name;注意,会移除物理操作的 Exec$ 前缀
所有应该以该节点内嵌套树表示的 nodes,比如,可以被用来表示 sub-queries
(children ++ innerChildren).toSet[TreeNode[_]]
主要用于交互式 debug,返回该 tree 指定下标的节点,num 可以在 numberedTreeString 找到。最终调用的
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi
❸ spark和java的关系
通常大家只瞎胡是说Spark是基于内存计算的,速度比MapRece要快。或者说内存中迭代计算。其实我们要抓住问题的本质。总结有以下几点:
1、Spark vs MapRece ≠ 内存 vs 磁盘
其实Spark和MapRece的计算都发生在内存中,区别在于:
MapRece通常需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO。
Spark则不需要将计算的中间结果写入磁盘,这得益于Spark的RDD(弹性局高分布式数据集,很强大)和DAG(有向无环图),其中DAG记录了job的stage以及在job执行过程中父RDD和子RDD之间的依赖关系。中间结果能够以RDD的形式存放在内存中,且能够从DAG中恢复,大大减少了磁盘IO。
2、Spark vs MapRece Shuffle的不同
Spark和MapRece在计算过程中通常都不可避免的会进行Shuffle,两者至少有一点不同:
MapRece在Shuffle时需要花费大量时间进行排序,排序在MapRece的Shuffle中似乎是不可避免的;
Spark在Shuffle时则只有部分场景才需要排序,支持基于Hash的分布式聚合,更加省时;
3、多进程模型 vs 多线程模型的区别
MapRece采用了多进程模型,而Spark采用了多磨腊拦线程模型。多进程模型的好处是便于细粒度控制每个任务占用的资源,但每次任务的启动都会消耗一定的启动时间。就是说MapRece的Map Task和Rece Task是进程级别的,而Spark Task则是基于线程模型的,就是说maprece 中的 map 和 rece 都是 jvm 进程,每次启动都需要重新申请资源,消耗了不必要的时间(假设容器启动时间大概1s,如果有1200个block,那么单独启动map进程事件就需要20分钟)
Spark则是通过复用线程池中的线程来减少启动、关闭task所需要的开销。(多线程模型也有缺点,由于同节点上所有任务运行在一个进程中,因此,会出现严重的资源争用,难以细粒度控制每个任务占用资源)
总结:关于Spark为什么比MapRece快,或者Spark速度快于MapRece的原因,总结至少有这几点不同之处吧。
❹ Spark 中用 Scala 和 java 开发有什么区别
Scala相对于Java的优势是巨大的。熟悉Scala之后再看Java代码,有种读汇编的感觉…… 如果仅仅是写Spark应用,并非一定要学Scala,可以直接用Spark的Java API或Python API。但因为语言上的差异,用Java开发Spark应用要罗嗦许多。好在带lambda的Java 8出来之后有所改善。 在Spark应用开发上,学Scala主要好处有二: 开发效率更高,代码更精简; 使用Spark过程中出现异常情况,在排查时如果对Spark源码比较熟悉,可以事半功倍
❺ Spark源码分析之SparkSubmit的流程
本文主要对SparkSubmit的任务提交流程源码进行分析。 Spark源码版本为2.3.1。
首先阅读一下启动脚本,看看首先加载的是哪个类,我们看一下 spark-submit 启动脚本中的具体内容。
可以看到这里加载的类是org.apache.spark.deploy.SparkSubmit,并且把启动相关的参数也带过去了。下面我们跟一下源码看看整个流程是如何运作的...
SparkSubmit的main方法如下
这里我们由于我们是提交作业,所有会走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先会准备任务提交的环境,调用了prepareSubmitEnvironment,该方法会返回四元组,该方法中会调用doPrepareSubmitEnvironment,这里我们重点注意 childMainClass类具体是什么 ,因为这里涉及到后面启动我们主类的过程。
以下是doPrepareSubmitEnvironment方法的源码...
可以看到该方法首先是解析相关的参数,如jar包,mainClass的全限定名,系统配置,校验一些参数,等等,之后的关键点就是根据我们 deploy-mode 参数来判断是如何运行我们的mainClass,这里主要是通过childMainClass这个参数来决定下一步首先启动哪个类。
childMainClass根据部署模型有不同的值:
之后该方法会把准备好的四元组返回,我们接着看之前的submit方法
可以看到这里最终会调用doRunMain()方法去进行下一步。
doRunMain的实现如下...
doRunMain方法中会判断是否需要一个代理用户,然后无论需不需要都会执行runMain方法,我们接下来看看runMain方法是如何实现的。
这里我们只假设以集群模式启动,首先会加载类,将我们的childMainClass加载为字节码对象mainClass ,然后将mainClass 映射成SparkApplication对象,因为我们以集群模式启动,那么上一步返回四元组中的childMainClass的参数为ClientApp的全限定名,而这里会调用app实例的start方法因此,这里最终调用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到这里和之前我们的master启动流程有些相似。
可以参考我上一篇文章 Spark源码分析之Master的启动流程 对这一流程加深理解。
首先是准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,因为这里运行start方法时会将之前配置的相关参数都传进来,之后就会通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信。
而在我上一篇文章中说过,只要是setupEndpoint方法被调用,一定会调用相关端点的的onStart方法,而这会调用clientEndPoint的onStart方法。
ClientEndPoint类中的onStart方法会匹配launch事件。源码如下
onStart中匹配我们的launch的过程,这个过程是启动driverWrapper的过程,可以看到上面源码中封装了mainClass ,该参数对应DriverWrapper类的全限定名,之后将mainClass封装到command中,然后封装到driverDescription中,向Master申请启动Driver。
这个过程会向Mster发送消息,是通过rpcEnv来实现发射消息的,而这里就涉及到outbox信箱,会调用postToOutbox方法,向outbox信箱中添加消息,然后通过TransportClient的send或sendRpc方法发送消息。发件箱以及发送过程是在同一个线程中进行。
而细心的同学会注意到这里调用的方法名为SendToMasterAndForwardReply,见名之意,发送消息到master并且期待回应。
下面是rpcEnv来实现向远端发送消息的一个调用流程,最终会通过netty中的TransportClient来写出。
之后,Master端会触发receiveAndReply函数,匹配RequestSubmitDriver样例类,完成模式匹配执行后续流程。
可以看到这里首先将Driver信息封装成DriverInfo,然后添加待调度列表waitingDrivers中,然后调用通用的schele函数。
由于waitingDrivers不为空,则会走LaunchDriver的流程,当前的application申请资源,这时会向worker发送消息,触发Worker的receive方法。
Worker的receive方法中,当Worker遇到LaunchDriver指令时,创建并启动一个DriverRunner,DriverRunner启动一个线程,异步的处理Driver启动工作。这里说启动的Driver就是刚才说的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是开辟线程异步的处理Driver启动工作,不会阻塞主进程的执行,而prepareAndRunDriver方法中最终调用 runDriver..
runDriver中主要先做了一些初始化工作,接着就开始启动driver了。
上述Driver启动工作主要分为以下几步:
下面我们直接看DriverWrapper的实现
DriverWrapper,会创建了一个RpcEndpoint与RpcEnv,RpcEndpoint为WorkerWatcher,主要目的为监控Worker节点是否正常,如果出现异常就直接退出,然后当前的ClassLoader加载userJar,同时执行userMainClass,在执行用户的main方法后关闭workerWatcher。
以上就是SparkSubmit的流程,下一篇我会对SparkContext的源码进行解析。
欢迎关注...
❻ spark源码二次开发难吗
spark源码二次开发不难。掌握了源码编译,就具备了对Spark进行二次开发的基本条件了,要修改Spark源码,进行二次开发,那么就得从官网下载指定版本的源码,导入ide开发环境,进行源码的修改。接着修改完了。
❼ Linux里面spark作用是什么
Spark是通用数据处理引擎,适用于多种情况。 应用程序开发人员和数据科学家将Spark集成到他们的应用程序中,以快速地大规模查询,分析和转换数据。 与Spark最频繁相关的任务包括跨大型数据集的交互式查询,来自传感器或金融系统的流数据处理以及机器学习任务。
Spark于2009年开始运作,最初是加州大学伯克利分校AMPLab内部的一个项目。 更具体地说,它是出于证明Mesos概念的需要而诞生的,Mesos概念也是在AMPLab中创建的。 在Mesos白皮书《 Mesos:数据中心中的细粒度资源共享平台》中首次讨论了Spark,其中最着名的作者是Benjamin Hindman和Matei Zaharia。
2013年,Spark成为Apache Software Foundation的孵化项目,并于2014年初被提升为该基金会的顶级项目之一。 Spark是基金会管理的最活跃的项目之一,围绕该项目成长的社区包括多产的个人贡献者和资金雄厚的企业支持者,例如Databricks,IBM和中国的华为。
从一开始,Spark就被优化为在内存中运行。 它比Hadoop的MapRece等替代方法更快地处理数据,后者倾向于在处理的每个阶段之间向计算机硬盘写入数据或从计算机硬盘写入数据。 Spark的支持者声称,Spark在内存中的运行速度可以比Hadoop MapRece快100倍,并且在以类似于Hadoop MapRece本身的方式处理基于磁盘的数据时也可以快10倍。 这种比较并不完全公平,这不仅是因为原始速度对Spark的典型用例而言比对批处理更为重要,在这种情况下,类似于MapRece的解决方案仍然很出色。
❽ spark thrift server 与 网易 kyuubi thrift server
thrift server可以实现通过jdbc, beeline等工具,实现连接到spark集群,并提交sql查询的机制。
默认情况下,cdh安装的spark没有包含thrift server模块,因此我们需要重新编译spark。
另外,为了不影响cdh自带的spark,而且spark目前都是基于yarn运行的,本身也没有什么独立的服务部署(除了history sever)。
所以,在一个集群中,可以部署安装多个版本的spark。
我们使用源码编译的spark 2.4.0(其中hive的版本是1.2.1)
cdh集成的spark版本和Hive版本如下:
使用jdk1.8
修改spark提供的mvn,使用自行安装的maven 3.8.1
使用make-distribution.sh可以帮助与我们编译之后打包成tgz文件
修改pom.xml文件的配置如下。
最后,执行编译命令如下:
这样打出的包,就含有thrift server的jar包了。
最终打包文件,根目录下。
之后就是解压到其他目录下后即可。
将hive-site.xml的文件连接过来,这样spark就可以读取hive的表了。
为了确保spark提交到yarn上运行,需要配置
cp spark-defaults.conf.template spar-defaults.conf
另外,可以在spark-env.sh中设置环境变量。
HADOOP_CONF_DIR
环境变量,也可以在/etc/profile中设置
启动日志可以查看,注意下端口占用问题,如下。
启动时候,使用beeline工具连接上,主要这里不用使用cdh默认安装hive提供的beeline工具,应为版本太高。
使用编译后spark生成beeline工具
参考beeline使用教程。
https://github.com/apache/incubator-kyuubi
kyuubi是基于thrift sever二次开发,在系能和安全上优于thrift server。
鉴于目前hive的版本是2.1,而最新的kyuubi的hive是2.3,所以采用前天版本的kyuubi,采用0.7版本,保证hive的版本小于当前集群中的hive版本。
使用build目录下的dist脚本进行编译和打包。
编译成功后,会在更目录下出现tar.gz的压缩文件,如上图。
之后解压到目录下。
配置bin/kyuubi-env.sh脚本,设置spark路径
执行bin/start-kyuubi.sh命令即可。
访问的方式同样采用beelin,注意使用上面章节的beeline工具。
访问后,可以通过beeline访问到hive的表(在spark中已经配置了hive-site.xml)
!connect jdbc: hive2://xxxx:10009 即可。