博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark分析之SparkContext启动过程分析
阅读量:6405 次
发布时间:2019-06-23

本文共 5919 字,大约阅读时间需要 19 分钟。

SparkContext作为整个Spark的入口,不管是spark、sparkstreaming、spark sql都需要首先创建一个SparkContext对象,然后基于这个SparkContext进行后续RDD的操作;所以很有必要了解下SparkContext在初始化时干了什么事情。

 

SparkContext初始化过程主要干了如下几件事情:

1、根据SparkContext的构造入参SparkConf创建SparkEnv;

2、初始化SparkUI;

3、创建TaskScheduler;

4、创建DAGScheduler;

5、启动taskScheduler;

 

通过源代码说明SparkContext初始化的过程

1、创建SparkEnv

private[spark] val env = SparkEnv.create(    conf, "
", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true, isLocal = isLocal, listenerBus = listenerBus)SparkEnv.set(env)

 

2、初始化SparkUI

private[spark] val ui = new SparkUI(this)ui.bind()

 

3、创建TaskScheduler:根据spark的运行模式创建不同的SchedulerBackend

private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {    val SPARK_REGEX = """spark://(.*)""".r    master match {      case SPARK_REGEX(sparkUrl) =>        val scheduler = new TaskSchedulerImpl(sc)        val masterUrls = sparkUrl.split(",").map("spark://" + _)        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)        scheduler.initialize(backend) //为TaskSchedulerImpl中的backend变量初始化        scheduler   }}TaskSchedulerImpl extends TaskScheduler{    var backend: SchedulerBackend = null    def initialize(backend: SchedulerBackend) {        this.backend = backend   //将SparkDeploySchedulerBackend赋值给backend变量        rootPool = new Pool("", schedulingMode, 0, 0)        schedulableBuilder = {            schedulingMode match {                case SchedulingMode.FIFO =>  //先进先出调度                    new FIFOSchedulableBuilder(rootPool)                case SchedulingMode.FAIR =>   //公平调度                    new FairSchedulableBuilder(rootPool, conf)            }        }        schedulableBuilder.buildPools()    }}private[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String])  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with AppClientListener with Logging {    }

 

4、创建DAGScheduler:根据TaskScheduler创建DAGScheduler,用于接收提交过来的job

 

 

//根据TaskScheduler创建DAGScheduler,产生eventProcssActor(是DAGSchedule的通信载体,能接收和发送很多消息)@volatile private[spark] var dagScheduler: DAGScheduler = new DAGScheduler(this)class DAGScheduler{        def this(sc: SparkContext) = this(sc, sc.taskScheduler)    private def initializeEventProcessActor() {        implicit val timeout = Timeout(30 seconds)        val initEventActorReply =  dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))        eventProcessActor = Await.result(initEventActorReply, timeout.duration).        asInstanceOf[ActorRef]    }    initializeEventProcessActor()} //详细分析见DAGScheduler篇章private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)extends Actor with Logging {
{ override def preStart() { dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties) ...... }}

 

5、启动taskScheduler

启动taskScheduler的主要目的是启动相应的SchedulerBackend,并判断是否进行推测式执行任务;

在启动TaskScheduler的过程中会创建Application并向Master发起注册请求;

taskScheduler.start() TaskSchedulerImpl extends TaskScheduler{    var backend: SchedulerBackend = null    override def start() {        backend.start()        //spark.speculation...    }}private[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String])  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with AppClientListener with Logging {    var client: AppClient = null    val maxCores = conf.getOption("spark.cores.max").map(_.toInt)        override def start() {        super.start()  //调用CoarseGrainedSchedulerBackend的start()方法        val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(            conf.get("spark.driver.host"), conf.get("spark.driver.port"),            CoarseGrainedSchedulerBackend.ACTOR_NAME)        val command = Command(            "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,            classPathEntries, libraryPathEntries, extraJavaOpts)        val sparkHome = sc.getSparkHome()        val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,            sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))        client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)        client.start()      }}class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) extends SchedulerBackend with Logging    var driverActor: ActorRef = null    override def start() {        driverActor = actorSystem.actorOf(        Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)    }}class ClientActor extends Actor with Logging{    override def preStart() {        registerWithMaster()  //向Master注册Application    }}

 

CoarseGrainedSchedulerBackend与CoarseGrainedExecutorBackend通信

private[spark] class CoarseGrainedExecutorBackend(driverUrl: String, executorId: String, hostPort: String, cores: Int)  extends Actor with ExecutorBackend with Logging {    var executor: Executor = null    var driver: ActorSelection = null    override def preStart() {        logInfo("Connecting to driver: " + driverUrl)        driver = context.actorSelection(driverUrl)        driver ! RegisterExecutor(executorId, hostPort, cores)  //注册Executor,接收方是CoarseGrainedSchedulerBackend        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])    }    override def receive = {        case RegisteredExecutor(sparkProperties)        case LaunchTask(taskDesc)         case KillTask(taskId, _, interruptThread)        case StopExecutor    }}

 

 

 

 

 

 

 

 

 

 

 

 

转载地址:http://untea.baihongyu.com/

你可能感兴趣的文章
翻译: 星球生成 II
查看>>
IOS 多线程
查看>>
python序列化数据本地存放
查看>>
#CCNA#IP地址、子网划分参考资料网址
查看>>
比较不错的图片上传插件
查看>>
判偶不判奇
查看>>
Sequelize 数据库的支持
查看>>
BigDecimal类的加减乘除
查看>>
lighttpd中实现每天一个访问日志文件
查看>>
node.js发送邮件email
查看>>
查看nginx配置文件路径的方法
查看>>
接口性能调优方案探索
查看>>
kali安装包或更新时提示“E: Sub-process /usr/bin/dpkg return”
查看>>
网站管理后台模板 Charisma
查看>>
EL:empty的用法
查看>>
Saltstack配置之 nodegroups
查看>>
Servlet和JSP优化经验总结
查看>>
squid使用rotate轮询(分割)日志
查看>>
VS2015安装EF Power Tools
查看>>
MySQL主从复制(笔记)
查看>>