如何进行DAGScheduler源码解读
如何进行DAGScheduler源码解读,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:
进入其构造函数中:
可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。
LiveListenerBus:
BlockManagerMaster:
通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法
privatedefinitializeEventProcessActor(){//blockingthethreaduntilsupervisorisstarted,whichensureseventProcessActoris//notnullbeforeanyjobissubmitted//阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActornotnullimplicitvaltimeout=Timeout(30seconds)valinitEventActorReply=dagSchedulerActorSupervisor?Props(newDAGSchedulerEventProcessActor(this))eventProcessActor=Await.result(initEventActorReply,timeout.duration). asInstanceOf[ActorRef] } initializeEventProcessActor()
DAGSchedulerEventProcessActor:
private[scheduler]classDAGSchedulerEventProcessActor(dagScheduler:DAGScheduler)extendsActorwithLogging{overridedefpreStart(){//setDAGSchedulerfortaskSchedulertoensureeventProcessActorisalways//validwhenthemessagesarrive//设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候//eventProcessActor已经准备就绪dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) }/***ThemaineventloopoftheDAGscheduler.*/defreceive={caseJobSubmitted(jobId,rdd,func,partitions,allowLocal,callSite,listener,properties)=> dagScheduler.handleJobSubmitted(jobId,rdd,func,partitions,allowLocal,callSite,listener,properties)caseStageCancelled(stageId)=> dagScheduler.handleStageCancellation(stageId)caseJobCancelled(jobId)=> dagScheduler.handleJobCancellation(jobId)caseJobGroupCancelled(groupId)=> dagScheduler.handleJobGroupCancelled(groupId)caseAllJobsCancelled=> dagScheduler.doCancelAllJobs()caseExecutorAdded(execId,host)=> dagScheduler.handleExecutorAdded(execId,host)caseExecutorLost(execId)=> dagScheduler.handleExecutorLost(execId,fetchFailed=false)caseBeginEvent(task,taskInfo)=> dagScheduler.handleBeginEvent(task,taskInfo)caseGettingResultEvent(taskInfo)=> dagScheduler.handleGetTaskResult(taskInfo)casecompletion@CompletionEvent(task,reason,_,_,taskInfo,taskMetrics)=> dagScheduler.handleTaskCompletion(completion)caseTaskSetFailed(taskSet,reason)=> dagScheduler.handleTaskSetFailed(taskSet,reason)caseResubmitFailedStages=> dagScheduler.resubmitFailedStages() }overridedefpostStop(){//CancelanyactivejobsinpostStophookdagScheduler.cleanUpAfterSchedulerStop() } }
可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。
关于如何进行DAGScheduler源码解读问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注博信行业资讯频道了解更多相关知识。
版权声明
本文仅代表作者观点,不代表博信信息网立场。