入门:引入和插入额外数据

本入门文章将逐步讲解如何使用 Azure Databricks 笔记本将包含其他婴儿名称数据的 CSV 文件引入 Unity Catalog 卷,然后使用 Python、Scala 和 R 将新的婴儿名称数据导入到现有表中。

重要

本入门文章以“入门:从笔记本导入和可视化 CSV 数据”为基础。 必须完成本文中的步骤才能完成本文。 有关该入门文章的完整笔记本,请参阅导入和可视化数据笔记本

要求

要完成本文中的任务,必须满足以下要求:

提示

有关本文的完整笔记本,请参阅引入额外数据笔记本

步骤 1:创建新笔记本

若要在工作区中创建笔记本,请单击边栏中的““新建”图标 新建”,然后单击“笔记本”。 将在工作区中打开一个空白笔记本。

若要了解有关创建和管理笔记本的详细信息,请参阅管理笔记本

步骤 2:定义变量

在此步骤中,定义要在本文中创建的示例笔记本中使用的变量。

  1. 将以下代码复制并粘贴到新的空笔记本单元格中: 将 <catalog-name><schema-name><volume-name> 替换为 Unity 目录卷的目录、架构和卷名称。 请将 <table_name> 替换为你选择的表名称。 本文稍后会将婴儿姓名数据保存到此表中。

  2. Shift+Enter 以运行单元格并创建新的空白单元格。

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    file_name = "new_baby_names.csv"
    table_name = "baby_names"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val fileName = "new_baby_names.csv"
    val tableName = "baby_names"
    val pathVolume = s"/Volumes/${catalog}/${schema}/${volume}"
    val pathTable = s"${catalog}.${schema}"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    file_name <- "new_baby_names.csv"
    table_name <- "baby_names"
    path_volume <- paste0("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste0(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    

步骤 3:向 Unity Catalog 卷添加新的 CSV 数据文件

此步骤创建一个名为 df 且包含 2022 年新婴儿姓名的数据帧,然后将该数据保存到 Unity Catalog 卷中的新 CSV 文件中。

注意

此步骤模拟向已加载的往年现有数据添加新的年度数据。 在生产环境中,此增量数据将存储在云存储中。

  1. 将以下代码复制并粘贴到新的空笔记本单元格中: 此代码创建包含额外婴儿姓名数据的数据帧,然后将该数据写入 Unity Catalog 卷中的 CSV 文件。

    Python

    data = [[2022, "CARL", "Albany", "M", 42]]
    
    df = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    # display(df)
    (df.coalesce(1)
        .write
        .option("header", "true")
        .mode("overwrite")
        .csv(f"{path_volume}/{file_name}"))
    

    Scala

    val data = Seq((2022, "CARL", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df = data.toDF(columns: _*)
    
    // display(df)
    df.coalesce(1)
        .write
        .option("header", "true")
        .mode("overwrite")
        .csv(f"{pathVolume}/{fileName}")
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(Year = 2022,
        First_Name = "CARL",
        County = "Albany",
        Sex = "M",
        Count = 42)
    
    df <- createDataFrame(data)
    # display(df)
    write.df(df, path = paste0(path_volume, "/", file_name),
        source = "csv",
        mode = "overwrite",
        header = "true")
    
  2. Shift+Enter 以运行单元格,然后移动到下一个单元格。

步骤 4:将数据从 CSV 文件加载到数据帧

注意

此步骤模拟从云存储加载数据。

  1. 将以下代码复制并粘贴到空的笔记本单元格中。 此代码将新的婴儿姓名数据从 CSV 文件加载到新的数据帧。

    Python

    df1 = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df1)
    

    Scala

    val df1 = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    display(df1)
    

    R

    df1 <- read.df(paste0(path_volume, "/", file_name),
        source = "csv",
        header = TRUE,
        inferSchema = TRUE)
    display(df1)
    
  2. Shift+Enter 以运行单元格,然后移动到下一个单元格。

步骤 5:插入到现有表中

  1. 将以下代码复制并粘贴到空的笔记本单元格中。 此代码将数据帧中新的婴儿姓名数据追加到现有表中。

    Python

    df.write.mode("append").insertInto(f"{path_table}.{table_name}")
    display(spark.sql(f"SELECT * FROM {path_table}.{table_name} WHERE Year = 2022"))
    

    Scala

    df1.write.mode("append").insertInto(s"${pathTable}.${tableName}")
    display(spark.sql(s"SELECT * FROM ${pathTable}.${tableName} WHERE Year = 2022"))
    

    R

    # The write.df function in R, as provided by the SparkR package, does not directly support writing to Unity Catalog.
    # In this example, you write the DataFrame into a temporary view and then use the SQL command to insert data from the temporary view to the Unity Catalog table
    createOrReplaceTempView(df1, "temp_view")
    sql(paste0("INSERT INTO ", path_table, ".", table_name, " SELECT * FROM temp_view"))
    display(sql(paste0("SELECT * FROM ", path_table, ".", table_name, " WHERE Year = 2022")))
    
  2. Ctrl+Enter 运行该单元格。

引入额外数据笔记本

使用以下笔记本之一执行本文中的步骤。

Python

使用 Python 引入和插入额外数据

获取笔记本

Scala

使用 Scala 引入和插入额外数据

获取笔记本

R

使用 R 引入和插入额外数据

获取笔记本

后续步骤

若要了解清理和增强数据,请参阅入门:增强和清理数据

其他资源