学堂 学堂 学堂公众号手机端

如何进行DAGScheduler源码解读

lewis 1年前 (2024-04-15) 阅读数 14 #技术

如何进行DAGScheduler源码解读,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:



进入其构造函数中:



可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。

LiveListenerBus:


BlockManagerMaster:


通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法

privatedefinitializeEventProcessActor(){//blockingthethreaduntilsupervisorisstarted,whichensureseventProcessActoris//notnullbeforeanyjobissubmitted//阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActornotnullimplicitvaltimeout=Timeout(30seconds)valinitEventActorReply=dagSchedulerActorSupervisor?Props(newDAGSchedulerEventProcessActor(this))eventProcessActor=Await.result(initEventActorReply,timeout.duration).
asInstanceOf[ActorRef]
}

initializeEventProcessActor()

DAGSchedulerEventProcessActor:

private[scheduler]classDAGSchedulerEventProcessActor(dagScheduler:DAGScheduler)extendsActorwithLogging{overridedefpreStart(){//setDAGSchedulerfortaskSchedulertoensureeventProcessActorisalways//validwhenthemessagesarrive//设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候//eventProcessActor已经准备就绪dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
}/***ThemaineventloopoftheDAGscheduler.*/defreceive={caseJobSubmitted(jobId,rdd,func,partitions,allowLocal,callSite,listener,properties)=>
dagScheduler.handleJobSubmitted(jobId,rdd,func,partitions,allowLocal,callSite,listener,properties)caseStageCancelled(stageId)=>
dagScheduler.handleStageCancellation(stageId)caseJobCancelled(jobId)=>
dagScheduler.handleJobCancellation(jobId)caseJobGroupCancelled(groupId)=>
dagScheduler.handleJobGroupCancelled(groupId)caseAllJobsCancelled=>
dagScheduler.doCancelAllJobs()caseExecutorAdded(execId,host)=>
dagScheduler.handleExecutorAdded(execId,host)caseExecutorLost(execId)=>
dagScheduler.handleExecutorLost(execId,fetchFailed=false)caseBeginEvent(task,taskInfo)=>
dagScheduler.handleBeginEvent(task,taskInfo)caseGettingResultEvent(taskInfo)=>
dagScheduler.handleGetTaskResult(taskInfo)casecompletion@CompletionEvent(task,reason,_,_,taskInfo,taskMetrics)=>
dagScheduler.handleTaskCompletion(completion)caseTaskSetFailed(taskSet,reason)=>
dagScheduler.handleTaskSetFailed(taskSet,reason)caseResubmitFailedStages=>
dagScheduler.resubmitFailedStages()
}overridedefpostStop(){//CancelanyactivejobsinpostStophookdagScheduler.cleanUpAfterSchedulerStop()
}
}

可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。

关于如何进行DAGScheduler源码解读问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注博信行业资讯频道了解更多相关知识。

版权声明

本文仅代表作者观点,不代表博信信息网立场。

热门