Spark 提交任务弃用通知和迁移指南

警告

Spark Submit任务已弃用并即将被移除。 对于新用例,不允许使用此任务类型,强烈建议不要对现有客户使用。 有关此任务类型的原始文档,请参阅 Spark Submit(旧 版)。 继续阅读迁移说明。

为什么 Spark Submit 已弃用?

由于 JARNotebookPython 脚本任务中不存在的技术限制和功能差距,Spark 提交任务类型已被弃用。 这些任务提供与 Databricks 功能更好的集成、改进的性能和可靠性。

弃用措施

Databricks 正在实施与即将淘汰相关的以下措施:

  • 受限创建:仅从 2025 年 11 月开始使用 Spark 提交 任务的用户才能创建新的 Spark 提交 任务。 如果您需要特殊豁免,请联系您的账户支持。
  • DBR 版本限制Spark 提交 使用仅限于现有 DBR 版本和维护版本。 使用 Spark Submit 的现有 DBR 版本将继续接收安全和 bugfix 维护版本,直到功能完全关闭。 DBR 17.3+ 和 18.x+ 不支持此任务类型。
  • UI 警告:整个 Databricks UI 中会出现警告,当 Spark 提交 任务在使用时,通信将发送给属于现有用户帐户的工作区管理员。

将 JVM 工作负荷迁移到 JAR 任务

对于 JVM 工作负荷,请将 Spark 提交 任务迁移到 JAR 任务。 JAR 任务提供更好的功能支持和与 Databricks 的集成。

按照以下步骤迁移:

  1. 在作业中创建新的 JAR 任务。
  2. Spark 提交 任务参数中,标识前三个参数。 它们通常遵循以下模式: ["--class", "org.apache.spark.mainClassName", "dbfs:/path/to/jar_file.jar"]
  3. 删除--class参数。
  4. 将主类名称(例如) org.apache.spark.mainClassName设置为 JAR 任务的 Main 类
  5. 在 JAR 任务配置中提供 JAR 文件的路径(例如 dbfs:/path/to/jar_file.jar)。
  6. Spark 提交 任务中的任何剩余参数复制到 JAR 任务参数。
  7. 运行 JAR 任务并验证它是否按预期工作。

有关配置 JAR 任务的详细信息,请参阅 JAR 任务

迁移 R 工作负荷

如果要直接从 Spark 提交 任务启动 R 脚本,则可以使用多个迁移路径。

选项 A:使用笔记本任务

将 R 脚本迁移到 Databricks 笔记本。 笔记本任务支持一组完整的功能,包括群集自动缩放,并更好地与 Databricks 平台集成。

选项 B:从笔记本任务启动 R 脚本

使用 Notebook 任务启动 R 脚本。 使用以下代码创建笔记本,并将 R 文件引用为作业参数。 根据需要修改以添加 R 脚本使用的参数:

dbutils.widgets.text("script_path", "", "Path to script")
script_path <- dbutils.widgets.get("script_path")
source(script_path)

查找使用 Spark Submit 任务的作业

可以使用以下 Python 脚本来标识工作区中包含 Spark 提交任务的作业。 需要有效的 个人访问或其他令牌 ,并且应使用 工作区 URL

选项 A: 快速扫描(优先运行此扫描,仅用于持久性作业)

此脚本仅扫描持久作业(通过 /jobs/create 或 Web 界面创建),不包括通过创建的 /runs/submit临时作业。 这是用于识别 Spark Submit 使用情况的推荐第一行方法,因为它要快得多。

#!/usr/bin/env python3
"""
Requirements:
    databricks-sdk>=0.20.0

Usage:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-token"
    python3 list_spark_submit_jobs.py

Output:
    CSV format with columns: Job ID, Owner ID/Email, Job Name

Incorrect:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied

def main():
    # Get credentials from environment
    workspace_url = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

    if not workspace_url or not token:
        print(
            "Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
            file=sys.stderr,
        )
        sys.exit(1)

    # Initialize client
    client = WorkspaceClient(host=workspace_url, token=token)

    # Scan workspace for persistent jobs with Spark Submit tasks
    # Using list() to scan only persistent jobs (faster than list_runs())
    print(
        "Scanning workspace for persistent jobs with Spark Submit tasks...",
        file=sys.stderr,
    )
    jobs_with_spark_submit = []
    total_jobs = 0

    # Iterate through all jobs (pagination is handled automatically by the SDK)
    skipped_jobs = 0
    for job in client.jobs.list(expand_tasks=True, limit=25):
        try:
            total_jobs += 1
            if total_jobs % 1000 == 0:
                print(f"Scanned {total_jobs} jobs total", file=sys.stderr)

            # Check if job has any Spark Submit tasks
            if job.settings and job.settings.tasks:
                has_spark_submit = any(
                    task.spark_submit_task is not None for task in job.settings.tasks
                )

                if has_spark_submit:
                    # Extract job information
                    job_id = job.job_id
                    owner_email = job.creator_user_name or "Unknown"
                    job_name = job.settings.name or f"Job {job_id}"

                    jobs_with_spark_submit.append(
                        {"job_id": job_id, "owner_email": owner_email, "job_name": job_name}
                    )
        except PermissionDenied:
            # Skip jobs that the user doesn't have permission to access
            skipped_jobs += 1
            continue

    # Print summary to stderr
    print(f"Scanned {total_jobs} jobs total", file=sys.stderr)
    if skipped_jobs > 0:
        print(
            f"Skipped {skipped_jobs} jobs due to insufficient permissions",
            file=sys.stderr,
        )
    print(
        f"Found {len(jobs_with_spark_submit)} jobs with Spark Submit tasks",
        file=sys.stderr,
    )
    print("", file=sys.stderr)

    # Output CSV to stdout
    if jobs_with_spark_submit:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=["job_id", "owner_email", "job_name"],
            quoting=csv.QUOTE_MINIMAL,
        )
        writer.writeheader()
        writer.writerows(jobs_with_spark_submit)
    else:
        print("No jobs with Spark Submit tasks found.", file=sys.stderr)

if __name__ == "__main__":
    main()

选项 B:全面扫描(较慢,包括过去 30 天内的临时作业)

如果需要标识通过/runs/submit创建的临时作业,请使用更为详尽的脚本。 此脚本会扫描您的工作区中过去 30 天内运行的所有作业,包括通过 /jobs/create 创建的持久作业和临时作业。 此脚本可能需要几个小时才能在大型工作区中运行。

#!/usr/bin/env python3
"""
Requirements:
    databricks-sdk>=0.20.0

Usage:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-token"
    python3 list_spark_submit_runs.py

Output:
    CSV format with columns: Job ID, Run ID, Owner ID/Email, Job/Run Name

Incorrect:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied

def main():
    # Get credentials from environment
    workspace_url = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

    if not workspace_url or not token:
        print(
            "Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
            file=sys.stderr,
        )
        sys.exit(1)

    # Initialize client
    client = WorkspaceClient(host=workspace_url, token=token)

    thirty_days_ago_ms = int((time.time() - 30 * 24 * 60 * 60) * 1000)

    # Scan workspace for runs with Spark Submit tasks
    # Using list_runs() instead of list() to include ephemeral jobs created via /runs/submit
    print(
        "Scanning workspace for runs with Spark Submit tasks from the last 30 days... (this will take more than an hour in large workspaces)",
        file=sys.stderr,
    )
    runs_with_spark_submit = []
    total_runs = 0
    seen_job_ids = set()

    # Iterate through all runs (pagination is handled automatically by the SDK)
    skipped_runs = 0
    for run in client.jobs.list_runs(
        expand_tasks=True,
        limit=25,
        completed_only=True,
        start_time_from=thirty_days_ago_ms,
    ):
        try:
            total_runs += 1
            if total_runs % 1000 == 0:
                print(f"Scanned {total_runs} runs total", file=sys.stderr)

            # Check if run has any Spark Submit tasks
            if run.tasks:
                has_spark_submit = any(
                    task.spark_submit_task is not None for task in run.tasks
                )

                if has_spark_submit:
                    # Extract job information from the run
                    job_id = run.job_id if run.job_id else "N/A"
                    run_id = run.run_id if run.run_id else "N/A"
                    owner_email = run.creator_user_name or "Unknown"
                    # Use run name if available, otherwise try to construct a name
                    run_name = run.run_name or (
                        f"Run {run_id}" if run_id != "N/A" else "Unnamed Run"
                    )

                    # Track unique job IDs to avoid duplicates for persistent jobs
                    # (ephemeral jobs may have the same job_id across multiple runs)
                    key = (job_id, run_id)
                    if key not in seen_job_ids:
                        seen_job_ids.add(key)
                        runs_with_spark_submit.append(
                            {
                                "job_id": job_id,
                                "run_id": run_id,
                                "owner_email": owner_email,
                                "job_name": run_name,
                            }
                        )
        except PermissionDenied:
            # Skip runs that the user doesn't have permission to access
            skipped_runs += 1
            continue

    # Print summary to stderr
    print(f"Scanned {total_runs} runs total", file=sys.stderr)
    if skipped_runs > 0:
        print(
            f"Skipped {skipped_runs} runs due to insufficient permissions",
            file=sys.stderr,
        )
    print(
        f"Found {len(runs_with_spark_submit)} runs with Spark Submit tasks",
        file=sys.stderr,
    )
    print("", file=sys.stderr)

    # Output CSV to stdout
    if runs_with_spark_submit:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=["job_id", "run_id", "owner_email", "job_name"],
            quoting=csv.QUOTE_MINIMAL,
        )
        writer.writeheader()
        writer.writerows(runs_with_spark_submit)
    else:
        print("No runs with Spark Submit tasks found.", file=sys.stderr)

if __name__ == "__main__":
    main()

需要帮助?

如果需要其他帮助,请联系帐户支持人员。