override def main(args: Array[String]): Unit = {
  // Initialize logging if it hasn't been done yet. Keep track of   whether logging needs to
  // be reset before the application starts.
  val uninitLog = initializeLogIfNecessary(true, silent = true)
  val appArgs = new SparkSubmitArguments(args)
  if (appArgs.verbose) {
    // scalastyle:off println
    // scalastyle:on println
  appArgs.action match {
    case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
    case SparkSubmitAction.KILL => kill(appArgs)
    case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)

主函数中会创建SparkSubmitArguments对象,这个类主要的作用就是较验用户使用spark-submit提交的参数,最后封装到对象中,进入该类后,会执行parse(args.asJava)方法,解析spark-sumit后面跟着的参数,正则匹配参数中如果带有=并且是–开头的话,则会解析成key-value键值对,如果参数中没有=,则会进行关键字匹配,例如–driver-memory,代码中已经枚举了所有的关键key,循环较验参数,正则没匹配上的参数再进行循环匹配,如果正则全都没匹配上的话,时间复杂度就是O(n2),这段代码是java写的,可能是多个开发者协同开发吧,个人感觉使用scala match匹配会更好些,内层循环中如果匹配上了关键词,则会将下一个参数也取进来当成value,如果没匹配上,则会执行handleUnknown方法,判断当前参数是否为提交的jar资源,如果是jar资源则跳出参数解析循环,剩下的参数被传递进入handleExtraArgs方法,最后当成用户提交的jar的main class参数


if (mainClass == null && !isPython && !isR && primaryResource != null) {
  val uri = new URI(primaryResource)
  val uriScheme = uri.getScheme()
  uriScheme match {
    case "file" =>
      try {
        Utils.tryWithResource(new JarFile(uri.getPath)) { jar =>
          // Note that this might still return null if no main-class is set; we catch that later
          mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
      } catch {
        case _: Exception =>
          SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
    case _ =>
        s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
        "Please specify a class through --class.")

提交的jar的主类参数如果不存在,则会从打包时pom生成的文件中获取指定的main class,最后还要执行validateArguments方法验证一遍参数。


private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
  val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
  def doRunMain(): Unit = {
    if (args.proxyUser != null) {
      val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
    try {
      proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
        override def run(): Unit = {
          runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
    } catch {
      case e: Exception =>
        // Hadoop's AuthorizationException suppresses the exception's stack trace, which
        // makes the message printed to the output by the JVM not very helpful. Instead,
        // detect exceptions with empty stack traces here, and treat them differently.
        if (e.getStackTrace().length == 0) {
          // scalastyle:off println
          printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
          // scalastyle:on println
        } else {
          throw e
  } else {
      runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
  // Let the main class re-initialize the logging system once it starts.
  if (uninitLog) {
  // In standalone cluster mode, there are two submission gateways:
  //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
  //   (2) The new REST-based gateway introduced in Spark 1.3
  // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
  // to use the legacy gateway if the master endpoint turns out to be not a REST server.
  if (args.isStandaloneCluster && args.useRest) {
    try {
      // scalastyle:off println
      printStream.println("Running Spark using the REST application submission protocol.")
      // scalastyle:on println
    } catch {
      // Fail over to use the legacy submission gateway
      case e: SubmitRestConnectionException =>
        printWarning(s"Master endpoint ${args.master} was not a REST server. " +
          "Falling back to legacy submission gateway instead.")
        args.useRest = false
        submit(args, false)
  // In all other modes, just run the main class as prepared
  } else {


val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "kvstore_", "launcher_", "mllib_",
    "mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", "sql_", "streaming_",
    "tags_", "unsafe_")
/** Add exclusion rules for dependencies already included in the spark-assembly */
def addExclusionRules(
  ivySettings: IvySettings,
  ivyConfName: String,
  md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
IVY_DEFAULT_EXCLUDES.foreach { comp =>
  md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,


val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
} else {
  // SPARK-4170
  if (classOf[scala.App].isAssignableFrom(mainClass)) {
    printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
  new JavaMainApplication(mainClass)


def findCause(t: Throwable): Throwable = t match {
  case e: UndeclaredThrowableException =>
    if (e.getCause() != null) findCause(e.getCause()) else e
  case e: InvocationTargetException =>
    if (e.getCause() != null) findCause(e.getCause()) else e
  case e: Throwable =>
try {
  app.start(childArgs.toArray, sparkConf)
} catch {
  case t: Throwable =>
    findCause(t) match {
      case SparkUserAppException(exitCode) =>
      case t: Throwable =>
        throw t


private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
  override def start(args: Array[String], conf: SparkConf): Unit =   {
    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given 	main class must be static")
    val sysProps = conf.getAll.toMap
	   sysProps.foreach { case (k, v) =>
  	   sys.props(k) = v
    mainMethod.invoke(null, args)