❶ 大家对spark的源码了解多少,sparkshuffle,调度,sparkstreaming的源码
流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流;既然是数据流处理,就会想到数据的流入、数据的加工、数据的流出。
日常工作、生活中数据来源很多不同的地方。例如:工业时代的汽车制造、监控设备、工业设备会产生很多源数据;信息时代的电商网站、日志服务器、社交网络、金融交易系统、黑客攻击、垃圾邮件、交通监控等;通信时代的手机、平板、智能设备、物联网等会产生很多实时数据,数据流无处不在。
在大数据时代SparkStreaming能做什么?
平时用户都有网上购物的经历,用户在网站上进行的各种操作通过Spark Streaming流处理技术可以被监控,用户的购买爱好、关注度、交易等可以进行行为分析。在金融领域,通过Spark Streaming流处理技术可以对交易量很大的账号进行监控,防止罪犯洗钱、财产转移、防欺诈等。在网络安全性方面,黑客攻击时有发生,通过Spark Streaming流处理技术可以将某类可疑IP进行监控并结合机器学习训练模型匹配出当前请求是否属于黑客攻击。其他方面,如:垃圾邮件监控过滤、交通监控、网络监控、工业设备监控的背后都是Spark Streaming发挥强大流处理的地方。
大数据时代,数据价值一般怎么定义?
所有没经过流处理的数据都是无效数据或没有价值的数据;数据产生之后立即处理产生的价值是最大的,数据放置越久或越滞后其使用价值越低。以前绝大多数电商网站盈利走的是网络流量(即用户的访问量),如今,电商网站不仅仅需要关注流量、交易量,更重要的是要通过数据流技术让电商网站的各种数据流动起来,通过实时流动的数据及时分析、挖掘出各种有价值的数据;比如:对不同交易量的用户指定用户画像,从而提供不同服务质量;准对用户访问电商网站板块爱好及时推荐相关的信息。
SparkStreaming VSHadoopMR:
Spark Streaming是一个准实时流处理框架,而Hadoop MR是一个离线、批处理框架;很显然,在数据的价值性角度,Spark Streaming完胜于Hadoop MR。
SparkStreaming VS Storm:
Spark Streaming是一个准实时流处理框架,处理响应时间一般以分钟为单位,也就是说处理实时数据的延迟时间是秒级别的;Storm是一个实时流处理框架,处理响应是毫秒级的。所以在流框架选型方面要看具体业务场景。需要澄清的是现在很多人认为Spark Streaming流处理运行不稳定、数据丢失、事务性支持不好等等,那是因为很多人不会驾驭Spark Streaming及Spark本身。在Spark Streaming流处理的延迟时间方面,Spark定制版本,会将Spark Streaming的延迟从秒级别推进到100毫秒之内甚至更少。
SparkStreaming优点:
1、提供了丰富的API,企业中能快速实现各种复杂的业务逻辑。
2、流入Spark Streaming的数据流通过和机器学习算法结合,完成机器模拟和图计算。
3、Spark Streaming基于Spark优秀的血统。
SparkStreaming能不能像Storm一样,一条一条处理数据?
Storm处理数据的方式是以条为单位来一条一条处理的,而Spark Streaming基于单位时间处理数据的,SparkStreaming能不能像Storm一样呢?答案是:可以的。
业界一般的做法是Spark Streaming和Kafka搭档即可达到这种效果,入下图:
总结:
使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。
❷ spark sql 2.3 源码解读 - Execute (7)
终于到了最后一步执行了:
最关键的两个函数便是 doPrepare和 doExecute了。
还是以上一章的sql语句为例,其最终生成的sparkplan为:
看一下SortExec的doPrepare 和 doExecute方法:
下面看child也就是ShuffleExchangeExec:
先看没有exchangeCoordinator的情况,
首先执行:
上面的方法会返回一个ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它决定了每一条InternalRow shuffle后的partition id:
接下来:
返回结果是ShuffledRowRDD:
CoalescedPartitioner的逻辑:
再看有exchangeCoordinator的情况:
同样返回的是ShuffledRowRDD:
再看doEstimationIfNecessary:
estimatePartitionStartIndices 函数得到了 partitionStartIndices:
有exchangeCoordinator的情况就生成了partitionStartIndices,从而对分区进行了调整。
最后来一个例子:
未开启exchangeCoordinator的plan:
开启exchangeCoordinator的plan:
不同之处是 两个Exchange都带了coordinator,且都是同一个coordinator。
执行withExchangeCoordinator前:
执行withExchangeCoordinator后:
生成了coordinator,且执行了 doPrepare后,可以看到两个exchange都向其注册了。
doExecute后:
原先的numPartitions是200,经过执行后,生成的partitionStartIndices为[1],也就是只有1个partition,显然在测试数据量很小的情况下,1个partition是更为合理的。这就是ExchangeCoordinator的功劳。
execute 最终的输出是rdd,剩下的结果便是spark对rdd的运算了。其实 spark sql 最终的目标便也是生成rdd,交给spark core来运算。
spark sql的介绍到这里就结束了。
❸ Spark源码分析之SparkSubmit的流程
本文主要对SparkSubmit的任务提交流程源码进行分析。 Spark源码版本为2.3.1。
首先阅读一下启动脚本,看看首先加载的是哪个类,我们看一下 spark-submit 启动脚本中的具体内容。
可以看到这里加载的类是org.apache.spark.deploy.SparkSubmit,并且把启动相关的参数也带过去了。下面我们跟一下源码看看整个流程是如何运作的...
SparkSubmit的main方法如下
这里我们由于我们是提交作业,所有会走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先会准备任务提交的环境,调用了prepareSubmitEnvironment,该方法会返回四元组,该方法中会调用doPrepareSubmitEnvironment,这里我们重点注意 childMainClass类具体是什么 ,因为这里涉及到后面启动我们主类的过程。
以下是doPrepareSubmitEnvironment方法的源码...
可以看到该方法首先是解析相关的参数,如jar包,mainClass的全限定名,系统配置,校验一些参数,等等,之后的关键点就是根据我们 deploy-mode 参数来判断是如何运行我们的mainClass,这里主要是通过childMainClass这个参数来决定下一步首先启动哪个类。
childMainClass根据部署模型有不同的值:
之后该方法会把准备好的四元组返回,我们接着看之前的submit方法
可以看到这里最终会调用doRunMain()方法去进行下一步。
doRunMain的实现如下...
doRunMain方法中会判断是否需要一个代理用户,然后无论需不需要都会执行runMain方法,我们接下来看看runMain方法是如何实现的。
这里我们只假设以集群模式启动,首先会加载类,将我们的childMainClass加载为字节码对象mainClass ,然后将mainClass 映射成SparkApplication对象,因为我们以集群模式启动,那么上一步返回四元组中的childMainClass的参数为ClientApp的全限定名,而这里会调用app实例的start方法因此,这里最终调用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到这里和之前我们的master启动流程有些相似。
可以参考我上一篇文章 Spark源码分析之Master的启动流程 对这一流程加深理解。
首先是准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,因为这里运行start方法时会将之前配置的相关参数都传进来,之后就会通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信。
而在我上一篇文章中说过,只要是setupEndpoint方法被调用,一定会调用相关端点的的onStart方法,而这会调用clientEndPoint的onStart方法。
ClientEndPoint类中的onStart方法会匹配launch事件。源码如下
onStart中匹配我们的launch的过程,这个过程是启动driverWrapper的过程,可以看到上面源码中封装了mainClass ,该参数对应DriverWrapper类的全限定名,之后将mainClass封装到command中,然后封装到driverDescription中,向Master申请启动Driver。
这个过程会向Mster发送消息,是通过rpcEnv来实现发射消息的,而这里就涉及到outbox信箱,会调用postToOutbox方法,向outbox信箱中添加消息,然后通过TransportClient的send或sendRpc方法发送消息。发件箱以及发送过程是在同一个线程中进行。
而细心的同学会注意到这里调用的方法名为SendToMasterAndForwardReply,见名之意,发送消息到master并且期待回应。
下面是rpcEnv来实现向远端发送消息的一个调用流程,最终会通过netty中的TransportClient来写出。
之后,Master端会触发receiveAndReply函数,匹配RequestSubmitDriver样例类,完成模式匹配执行后续流程。
可以看到这里首先将Driver信息封装成DriverInfo,然后添加待调度列表waitingDrivers中,然后调用通用的schele函数。
由于waitingDrivers不为空,则会走LaunchDriver的流程,当前的application申请资源,这时会向worker发送消息,触发Worker的receive方法。
Worker的receive方法中,当Worker遇到LaunchDriver指令时,创建并启动一个DriverRunner,DriverRunner启动一个线程,异步的处理Driver启动工作。这里说启动的Driver就是刚才说的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是开辟线程异步的处理Driver启动工作,不会阻塞主进程的执行,而prepareAndRunDriver方法中最终调用 runDriver..
runDriver中主要先做了一些初始化工作,接着就开始启动driver了。
上述Driver启动工作主要分为以下几步:
下面我们直接看DriverWrapper的实现
DriverWrapper,会创建了一个RpcEndpoint与RpcEnv,RpcEndpoint为WorkerWatcher,主要目的为监控Worker节点是否正常,如果出现异常就直接退出,然后当前的ClassLoader加载userJar,同时执行userMainClass,在执行用户的main方法后关闭workerWatcher。
以上就是SparkSubmit的流程,下一篇我会对SparkContext的源码进行解析。
欢迎关注...