Compartir a través de

在 Azure Synapse Analytics 中分析复杂数据类型

本文适用于 Azure Synapse Link for Azure Cosmos DB 中的 Parquet 文件和容器。 可以使用 Spark 或 SQL 来读取或转换包含复杂架构(例如数组或嵌套结构)的数据。 以下示例是使用单个文档完成的,但可以使用 Spark 或 SQL 将文档数轻松扩展到数十亿个。 本文中包含的代码使用 PySpark (Python)。

用例

复杂的数据类型越来越常见,为数据工程师带来了挑战。 分析嵌套的架构和数组可能涉及耗时且复杂的 SQL 查询。 此外,可能很难重命名或强制转换嵌套列数据类型。 而且,在使用深度嵌套的对象时,可能会遇到性能问题。

数据工程师需要了解如何有效地处理复杂的数据类型,使每个人都可以轻松地对其进行访问。 在下面的示例中,你将在 Azure Synapse Analytics 中使用 Spark,通过数据帧读取对象并将其转换为平面结构。 使用 Azure Synapse Analytics 中的 SQL 无服务器模型直接查询此类对象,并以常规表的形式返回这些结果。

什么是数组和嵌套结构?

以下对象来自 Application Insights。 在此对象中,有嵌套结构和包含嵌套结构的数组。

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

包含数组和嵌套结构的架构的示例

通过命令 df.printschema 输出对象的数据帧(简称 df)的架构时,会看到以下表示形式:

  • 黄色表示嵌套结构。
  • 绿色表示具有两个元素的数组。

Code with yellow and green highlighting, showing schema origin

已将 _rid_ts_etag 添加到系统,因为已将文档引入到 Azure Cosmos DB 的事务性存储中。

前面的数据帧只针对 5 个列和 1 个行计数。 转换后,策展数据帧将有 13 个列和 2 个行(表格格式)。

平展嵌套结构和分解数组

借助 Azure Synapse Analytics 中的 Spark,可以轻松地将嵌套结构转换为列,将数组元素转换为多个行。 使用以下步骤进行实施。

Flowchart showing steps for Spark transformations

定义用于平展嵌套架构的函数

无需更改即可使用此函数。 使用以下函数在 PySpark 笔记本中创建单元格:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

使用可平展嵌套架构的函数

在此步骤中,你将数据帧 (df) 的嵌套架构平展成新的数据帧 (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

display 函数应返回 10 个列和 1 个行。 数组及其嵌套元素仍然存在。

转换数组

这里,你将数据帧 df_flat 中的数组 context_custom_dimensions 转换成新数据帧 df_flat_explode。 在下面的代码中,你还定义要选择哪个列:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

display 函数应返回 10 个列和 2 个行。 下一步是将嵌套架构与步骤 1 中定义的函数平展在一起。

使用可平展嵌套架构的函数

最后,使用函数将数据帧 df_flat_explode 的嵌套架构平展成新数据帧 df_flat_explode_flat

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

display 函数应显示 13 个列和 2 个行。

数据帧 df_flat_explode_flat 的函数 printSchema 返回以下结果:

Code showing the final schema

直接读取数组和嵌套结构

使用 SQL 的无服务器模型,你可以查询和创建基于此类对象的视图和表。

首先,用户应根据数据的存储方式使用以下分类。 以大写字母显示的所有内容都是特定于你的用例的:

批量 格式
'https://ACCOUNTNAME.dfs.core.chinacloudapi.cn/FILESYSTEM/PATH/FINENAME.parquet' 'Parquet' (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents.azure.cn:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' 'CosmosDB' (Azure Synapse Link)

替换每个字段,如下所示:

  • 'YOUR BULK ABOVE' 是你连接到的数据源的连接字符串。
  • 'YOUR TYPE ABOVE' 是用于连接到源的格式。
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

有两种不同的操作类型:

  • 下面的代码行中指示了第一种操作类型,该行定义名为 contextdataeventTime 的引用嵌套元素 Context.Data.eventTime 的列。

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    该行定义名为 contextdataeventTime 的引用嵌套元素 Context>Data>eventTime 的列。

  • 第二种操作类型使用 cross apply 为数组下的每个元素创建新行。 然后,它定义每个嵌套对象。

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    如果数组有 5 个具有 4 个嵌套结构的元素,则 SQL 的无服务器模型会返回 5 个行和 4 个列。 SQL 的无服务器模型可以就地查询,在 2 个行中映射数组,并将所有嵌套结构显示成列。

后续步骤