教程 6:功能存储的网络隔离

使用 Azure 机器学习托管特征存储可以发现、创建和操作特征。 特征在机器学习生命周期中充当连接组织,该生命周期始于原型制作阶段,在此阶段你可以试验各种特征。 该生命周期会持续到操作化阶段,在此阶段你可以部署模型,并且推理步骤会查找特征数据。 有关功能存储的详细信息,请参阅功能存储概念文档。

本教程介绍如何通过专用终结点配置安全入口,以及如何通过托管虚拟网络保护出口。

本教程系列的第 1 部分介绍了如何使用自定义转换创建特征集规范,以及如何使用特征集生成训练数据。 本系列的第 2 部分演示了如何启用具体化和如何执行回填。 此外,第 2 部分演示了如何试验特征以提高模型性能。 第 3 部分演示了特征存储如何提高试验和训练流的敏捷性。 第 3 部分还介绍了如何运行批处理推理。 教程 4 介绍了如何使用功能存储进行联机/实时推理用例。 教程 5 演示了如何使用自定义数据源开发功能集。 教程 6 演示如何

  • 为托管功能存储的网络隔离设置必要的资源。
  • 创建新的功能存储资源。
  • 设置功能存储以支持网络隔离方案。
  • 更新项目工作区(当前工作区),以支持网络隔离方案。

先决条件

注意

本教程将 Azure 机器学习笔记本与无服务器 Spark 计算配合使用

  • 请确保完成本系列教程的第 1 至 4 部分

  • 一个 Azure 机器学习工作区,通过托管虚拟网络为无服务器 Spark 作业启用

  • 配置项目工作区:

    1. 创建名为 network.yml的 YAML 文件:

      managed_network:
      isolation_mode: allow_internet_outbound
      
    2. 执行以下命令以更新工作区并为无服务器 Spark 作业预配托管虚拟网络:

      az ml workspace update --file network.yml --resource-group my_resource_group --name
      my_workspace_name
      az ml workspace provision-network --resource-group my_resource_group --name my_workspace_name
      --include-spark
      

    有关详细信息,请访问配置无服务器 spark 作业

  • 用户帐户必须将 OwnerContributor 角色分配给在其中创建功能存储的资源组。 用户帐户还需要 User Access Administrator 角色。

重要

对于 Azure 机器学习工作区,请将 isolation_mode 设置为 allow_internet_outbound。 这是唯一支持的网络隔离模式。 本教程将介绍如何通过专用终结点安全地连接到源、具体化存储和观察数据。

设置

本教程使用 Python 功能存储核心 SDK(azureml-featurestore)。 Python SDK 仅用于功能集开发和测试。 CLI 用于在功能存储、功能集和功能存储实体上创建、读取、更新和删除 (CRUD) 操作。 这对于持续集成和持续交付 (CI/CD) 或首选 CLI/YAML 的 GitOps 方案非常有用。

无需为本教程显式安装这些资源,因为在此处所示的设置说明中,conda.yaml 文件涵盖它们。

若要准备用于开发的笔记本环境,请执行以下操作:

  1. 使用以下命令将 azureml-examples 存储库克隆到本地 GitHub 资源:

    git clone --depth 1 https://github.com/Azure/azureml-examples

    还可以从 azureml-examples 存储库下载 zip 文件。 在此页中,首先选择 code 下拉列表,然后选择 Download ZIP。 然后,将内容解压缩到本地设备上的文件夹中。

  2. 将特征存储示例目录上传到项目工作区

    1. 在 Azure 机器学习工作区中,打开 Azure 机器学习工作室 UI
    2. 在左侧导航面板中选择“笔记本”
    3. 在目录列表中选择用户名
    4. 选择省略号 (...),然后选择“上传文件夹”
    5. 从克隆的目录路径中选择功能商店示例文件夹:azureml-examples/sdk/python/featurestore-sample
  3. 运行教程

    • 选项 1:创建新的笔记本,并逐步执行本文档中的说明

    • 选项 2:打开现有笔记本 featurestore_sample/notebooks/sdk_and_cli/network_isolation/Network-isolation-feature-store.ipynb。 可以将此文档保持打开状态,并参阅此文档以获取更多说明和文档链接

      1. 在顶部的导航“计算”下拉列表中选择“无服务器 Spark 计算”。 此操作可能需要一到两分钟。 等待顶部的状态栏显示“配置会话”
      2. 在顶部状态栏中选择“配置会话”
      3. 选择“Python 包”
      4. 选择“上传 conda 文件”
      5. 选择位于本地设备上的文件azureml-examples/sdk/python/featurestore-sample/project/env/conda.yml
      6. (可选)增加会话超时(空闲时间,以分钟为单位)以减少无服务器 Spark 群集启动时间
  4. 此代码单元启动 Spark 会话。 安装所有依赖项并启动 Spark 会话大约需要 10 分钟。

       # Run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.
       print("start spark session")
    
  5. 为示例设置根目录

       import os
    
       # Please update your alias below (or any custom directory you have uploaded the samples to).
       # You can find the name from the directory structure in the left navigation.
       root_dir = "./Users/<your user alias>/featurestore_sample"
    
       if os.path.isdir(root_dir):
    	  print("The folder exists.")
       else:
    	  print("The folder does not exist. Please create or fix the path")
    
  6. 设置 Azure 机器学习 CLI:

    • 安装 Azure 机器学习 CLI 扩展

      # install azure ml cli extension
      !az extension add --name ml
      
    • Authenticate

      # authenticate
      !az login
      
    • 设置默认订阅

      # Set default subscription
      import os
      
      subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
      
      !az account set -s $subscription_id
      

    注意

    特征存储工作区支持跨项目重复使用特征。 项目工作区(正在使用的当前工作区)利用特定特征存储中的特征来训练和推理模型。 许多项目工作区可以共享和重复使用相同的功能存储工作区。

预配必要的资源

可以创建新的 Azure Data Lake Storage (ADLS) Gen2 存储帐户和容器,或重复使用功能存储的现有存储帐户和容器资源。 在实际情况下,不同的存储帐户可以托管 ADLS Gen2 容器。 这两个选项都起作用,具体取决于你的特定要求。

在本教程中,在同一 ADLS Gen2 存储帐户中创建三个单独的存储容器:

  • 源数据
  • 脱机存储
  • 观察数据
  1. 为源数据、脱机存储和观察数据创建 ADLS Gen2 存储帐户。

    1. 在以下代码示例中提供 Azure Data Lake Storage Gen2 存储帐户的名称。 可以使用提供的默认设置执行以下代码单元。 (可选)可以替代默认设置。

      ## Default Setting
      # We use the subscription, resource group, region of this active project workspace,
      # We hard-coded default resource names for creating new resources
      
      ## Overwrite
      # You can replace them if you want to create the resources in a different subsciprtion/resourceGroup, or use existing resources
      # At the minimum, provide an ADLS Gen2 storage account name for `storage_account_name`
      
      storage_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
      storage_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
      storage_account_name = "<STORAGE_ACCOUNT_NAME>"
      
      storage_location = "eastus"
      storage_file_system_name_offline_store = "offline-store"
      storage_file_system_name_source_data = "source-data"
      storage_file_system_name_observation_data = "observation-data"
      
    2. 此代码单元创建上述代码单元中定义的 ADLS Gen2 存储帐户。

      # Create new storage account
      !az storage account create --name $storage_account_name --enable-hierarchical-namespace true --resource-group $storage_resource_group_name --location $storage_location --subscription $storage_subscription_id
      
    3. 此代码单元为脱机存储创建新的存储容器。

      # Create a new storage container for offline store
      !az storage fs create --name $storage_file_system_name_offline_store --account-name $storage_account_name --subscription $storage_subscription_id
      
    4. 此代码单元为源数据创建新的存储容器。

      # Create a new storage container for source data
      !az storage fs create --name $storage_file_system_name_source_data --account-name $storage_account_name --subscription $storage_subscription_id
      
    5. 此代码单元为观察数据创建新的存储容器。

      # Create a new storage container for observation data
      !az storage fs create --name $storage_file_system_name_observation_data --account-name $storage_account_name --subscription $storage_subscription_id
      
  2. 将本教程系列所需的示例数据复制到新创建的存储容器中。

    1. 若要将数据写入存储容器,请确保参与者和存储 Blob 数据参与者角色分配给 Azure 门户中创建的 ADLS Gen2 存储帐户上的用户标识,执行以下步骤

      重要

      一旦确保将参与者和存储 Blob 数据参与者角色分配给用户标识后,请等待几分钟后的角色分配让权限传播,然后再继续执行后续步骤。 若要详细了解访问控制,请访问 Azure 存储帐户基于角色的访问控制 (RBAC)

      以下代码单元将本教程中使用的事务功能集的示例源数据从公共存储帐户复制到新创建的存储帐户。

      # Copy sample source data for transactions feature set used in this tutorial series from the public storage account to the newly created storage account
      transactions_source_data_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet"
      transactions_src_df = spark.read.parquet(transactions_source_data_path)
      
      transactions_src_df.write.parquet(
          f"abfss://{storage_file_system_name_source_data}@{storage_account_name}.dfs.core.windows.net/transactions-source/"
      )
      
    2. 对于本教程中使用的帐户功能集,请将帐户功能集的示例源数据复制到新创建的存储帐户。

      # Copy sample source data for account feature set used in this tutorial series from the public storage account to the newly created storage account
      accounts_data_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/accounts-precalculated/*.parquet"
      accounts_data_df = spark.read.parquet(accounts_data_path)
      
      accounts_data_df.write.parquet(
          f"abfss://{storage_file_system_name_source_data}@{storage_account_name}.dfs.core.windows.net/accounts-precalculated/"
      )
      
    3. 将用于训练的示例观察数据从公共存储帐户复制到新创建的存储帐户。

      # Copy sample observation data used for training from the public storage account to the newly created storage account
      observation_data_train_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/train/*.parquet"
      observation_data_train_df = spark.read.parquet(observation_data_train_path)
      
      observation_data_train_df.write.parquet(
          f"abfss://{storage_file_system_name_observation_data}@{storage_account_name}.dfs.core.windows.net/train/"
      )
      
    4. 将用于批量推理的示例观察数据从公共存储帐户复制到新创建的存储帐户。

      # Copy sample observation data used for batch inference from a public storage account to the newly created storage account
      observation_data_inference_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/batch_inference/*.parquet"
      observation_data_inference_df = spark.read.parquet(observation_data_inference_path)
      
      observation_data_inference_df.write.parquet(
          f"abfss://{storage_file_system_name_observation_data}@{storage_account_name}.dfs.core.windows.net/batch_inference/"
      )
      
  3. 在新建的存储帐户上禁用公用网络访问。

    1. 此代码单元禁用前面创建的 ADLS Gen2 存储帐户的公共网络访问。

      # Disable the public network access for the above created ADLS Gen2 storage account
      !az storage account update --name $storage_account_name --resource-group $storage_resource_group_name --subscription $storage_subscription_id --public-network-access disabled
      
    2. 为脱机存储、源数据和观察数据容器设置 ARM ID。

      # set the container arm id
      offline_store_gen2_container_arm_id = "/subscriptions/{sub_id}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}/blobServices/default/containers/{container}".format(
          sub_id=storage_subscription_id,
          rg=storage_resource_group_name,
          account=storage_account_name,
          container=storage_file_system_name_offline_store,
      )
      
      print(offline_store_gen2_container_arm_id)
      
      source_data_gen2_container_arm_id = "/subscriptions/{sub_id}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}/blobServices/default/containers/{container}".format(
          sub_id=storage_subscription_id,
          rg=storage_resource_group_name,
          account=storage_account_name,
          container=storage_file_system_name_source_data,
      )
      
      print(source_data_gen2_container_arm_id)
      
      observation_data_gen2_container_arm_id = "/subscriptions/{sub_id}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}/blobServices/default/containers/{container}".format(
          sub_id=storage_subscription_id,
          rg=storage_resource_group_name,
          account=storage_account_name,
          container=storage_file_system_name_observation_data,
      )
      
      print(observation_data_gen2_container_arm_id)
      

预配用户分配的托管标识 (UAI)

  1. 创建新的用户分配的托管标识。

    1. 在以下代码单元中,提供要创建的用户分配的托管标识的名称。

      # User assigned managed identity values. Optionally you may change the values.
      uai_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
      uai_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
      uai_name = "<UAI_NAME>"
      # feature store location is used by default. You can change it.
      uai_location = storage_location
      
    2. 此代码单元创建 UAI。

      !az identity create --subscription $uai_subscription_id --resource-group $uai_resource_group_name --location $uai_location --name $uai_name
      
    3. 此代码单元检索所创建的 UAI 的主体 ID、客户端 ID 和 ARM ID 属性值。

      from azure.mgmt.msi import ManagedServiceIdentityClient
      from azure.mgmt.msi.models import Identity
      from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
      
      msi_client = ManagedServiceIdentityClient(
          AzureMLOnBehalfOfCredential(), uai_subscription_id
      )
      managed_identity = msi_client.user_assigned_identities.get(
          resource_name=uai_name, resource_group_name=uai_resource_group_name
      )
      
      uai_principal_id = managed_identity.principal_id
      uai_client_id = managed_identity.client_id
      uai_arm_id = managed_identity.id
      

    向用户分配的托管标识授予 RBAC 权限 (UAI)

    UAI 分配给功能存储,需要以下权限:

    范围 行动/角色
    特征存储 Azure 机器学习数据科学家角色
    特征存储脱机存储的存储帐户 存储 Blob 数据参与者角色
    源数据的存储帐户 存储 Blob 数据参与者角色

    下一个 CLI 命令会将存储 Blob 数据参与者角色分配给 UAI。 在此示例中,“源数据的存储帐户”不适用,因为从公共访问 Blob 存储中读取示例数据。 要使用自己的数据源,必须将所需角色分配给 UAI。 若要了解有关访问控制的详细信息,请参阅 Azure 存储帐户Azure 机器学习工作区的基于角色的访问控制。

       !az role assignment create --role "Storage Blob Data Contributor" --assignee-object-id $uai_principal_id --assignee-principal-type ServicePrincipal --scope $offline_store_gen2_container_arm_id
    
       !az role assignment create --role "Storage Blob Data Contributor" --assignee-object-id $uai_principal_id --assignee-principal-type ServicePrincipal --scope $source_data_gen2_container_arm_id
    
       !az role assignment create --role "Storage Blob Data Contributor" --assignee-object-id $uai_principal_id --assignee-principal-type ServicePrincipal --scope $observation_data_gen2_container_arm_id
    

创建启用了具体化的功能存储

设置功能存储参数

设置功能存储名称、位置、订阅 ID、组名称和 ARM ID 值,如以下代码单元示例所示:

# We use the subscription, resource group, region of this active project workspace.
# Optionally, you can replace them to create the resources in a different subsciprtion/resourceGroup, or use existing resources
import os

# At the minimum, define a name for the feature store
featurestore_name = "<YOUR_FEATURE_STORE_NAME>"
# It is recommended to create featurestore in the same location as the storage
featurestore_location = storage_location
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

feature_store_arm_id = "/subscriptions/{sub_id}/resourceGroups/{rg}/providers/Microsoft.MachineLearningServices/workspaces/{ws_name}".format(
    sub_id=featurestore_subscription_id,
    rg=featurestore_resource_group_name,
    ws_name=featurestore_name,
)

该代码单元为启用了具体化的功能存储生成 YAML 规范文件。

# The below code creates a feature store with enabled materialization
import yaml

config = {
    "$schema": "http://azureml/sdk-2-0/FeatureStore.json",
    "name": featurestore_name,
    "location": featurestore_location,
    "compute_runtime": {"spark_runtime_version": "3.2"},
    "offline_store": {
        "type": "azure_data_lake_gen2",
        "target": offline_store_gen2_container_arm_id,
    },
    "materialization_identity": {"client_id": uai_client_id, "resource_id": uai_arm_id},
}

feature_store_yaml = root_dir + "/featurestore/featurestore_with_offline_setting.yaml"

with open(feature_store_yaml, "w") as outfile:
    yaml.dump(config, outfile, default_flow_style=False)

创建功能商店

此代码单元使用上一步中生成的 YAML 规范文件来创建启用了具体化的功能存储。

!az ml feature-store create --file $feature_store_yaml --subscription $featurestore_subscription_id --resource-group $featurestore_resource_group_name

初始化 Azure 机器学习功能存储核心 SDK 客户端

在此单元中初始化的 SDK 客户端有助于开发和使用功能:

# feature store client
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

将角色分配给功能存储上的用户标识

此代码单元将 AzureML 数据科学家角色分配给创建的功能存储上的 UAI。 若要了解有关访问控制的详细信息,请参阅 Azure 存储帐户Azure 机器学习工作区的基于角色的访问控制。

!az role assignment create --role "AzureML Data Scientist" --assignee-object-id $uai_principal_id --assignee-principal-type ServicePrincipal --scope $feature_store_arm_id

按照以下说明获取用户标识的 Microsoft Entra 对象 ID。 然后,使用下一个命令中的 Microsoft Entra 对象 ID 将 AzureML 数据科学家角色分配给所创建功能存储上的用户标识。

your_aad_objectid = "<YOUR_AAD_OBJECT_ID>"

!az role assignment create --role "AzureML Data Scientist" --assignee-object-id $your_aad_objectid --assignee-principal-type User --scope $feature_store_arm_id

获取功能存储的默认存储帐户和密钥保管库,并禁用对相应资源的公用网络访问

下一个代码单元返回以下步骤的功能存储对象。

fs = featurestore.feature_stores.get()

此代码单元返回功能存储的默认存储帐户和密钥保管库的名称。

# Copy the properties storage_account and key_vault from the response returned in feature store show command respectively
default_fs_storage_account_name = fs.storage_account.rsplit("/", 1)[-1]
default_key_vault_name = fs.key_vault.rsplit("/", 1)[-1]

此代码单元禁用对功能存储的默认存储帐户的公共网络访问。

# Disable the public network access for the above created default ADLS Gen2 storage account for the feature store
!az storage account update --name $default_fs_storage_account_name --resource-group $featurestore_resource_group_name --subscription $featurestore_subscription_id --public-network-access disabled

下一个单元格打印功能存储的默认密钥保管库的名称。

print(default_key_vault_name)

为之前创建的默认功能存储密钥保管库禁用公用网络访问

  • 在 Azure 门户中,打开在上一个单元格中创建的默认密钥保管库。
  • 选择“网络”选项卡。
  • 选择“禁用公共访问”,然后选择页面左下角的“应用”

为功能存储工作区启用托管虚拟网络

使用必要的出站规则更新功能存储

下一个代码单元为功能存储定义的出站规则创建 YAML 规范文件。

# The below code creates a configuration for managed virtual network for the feature store
import yaml

config = {
    "public_network_access": "disabled",
    "managed_network": {
        "isolation_mode": "allow_internet_outbound",
        "outbound_rules": [
            # You need to add multiple rules here if you have separate storage account for source, observation data and offline store.
            {
                "name": "sourcerulefs",
                "destination": {
                    "spark_enabled": "true",
                    "subresource_target": "dfs",
                    "service_resource_id": f"/subscriptions/{storage_subscription_id}/resourcegroups/{storage_resource_group_name}/providers/Microsoft.Storage/storageAccounts/{storage_account_name}",
                },
                "type": "private_endpoint",
            },
            # This rule is added currently because serverless Spark doesn't automatically create a private endpoint to default key vault.
            {
                "name": "defaultkeyvault",
                "destination": {
                    "spark_enabled": "true",
                    "subresource_target": "vault",
                    "service_resource_id": f"/subscriptions/{featurestore_subscription_id}/resourcegroups/{featurestore_resource_group_name}/providers/Microsoft.Keyvault/vaults/{default_key_vault_name}",
                },
                "type": "private_endpoint",
            },
        ],
    },
}

feature_store_managed_vnet_yaml = (
    root_dir + "/featurestore/feature_store_managed_vnet_config.yaml"
)

with open(feature_store_managed_vnet_yaml, "w") as outfile:
    yaml.dump(config, outfile, default_flow_style=False)

此代码单元使用生成的 YAML 规范文件更新功能存储。

# This command will change to `az ml featurestore update` in future for parity.
!az ml workspace update --file $feature_store_managed_vnet_yaml --name $featurestore_name --resource-group $featurestore_resource_group_name

为定义的出站规则创建专用终结点

provision-network 命令从托管虚拟网络创建专用终结点,其中具体化作业将执行到源、脱机存储、观察数据、默认存储帐户和功能存储的默认密钥保管库。 此命令可能需要大约 20 分钟才能完成。

#### Provision network to create necessary private endpoints (it may take approximately 20 minutes)
!az ml workspace provision-network --name $featurestore_name --resource-group $featurestore_resource_group_name --include-spark

此代码单元确认创建由出站规则定义的专用终结点。

### Check that managed virtual network is correctly enabled
### After provisioning the network, all the outbound rules should become active
### For this tutorial, you will see 5 outbound rules
!az ml workspace show --name $featurestore_name --resource-group $featurestore_resource_group_name

更新项目工作区的托管虚拟网络

接下来,更新项目工作区的托管虚拟网络。 首先,获取项目工作区的订阅 ID、资源组和工作区名称。

# lookup the subscription id, resource group and workspace name of the current workspace
project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"]
project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"]

使用必要的出站规则更新项目工作区

项目工作区需要访问以下资源:

  • 源数据
  • 脱机存储
  • 观察数据
  • 特征存储
  • 功能存储的默认存储帐户

此代码单元使用生成的 YAML 规范文件以及所需的出站规则更新项目工作区。

# The below code creates a configuration for managed virtual network for the project workspace
import yaml

config = {
    "managed_network": {
        "isolation_mode": "allow_internet_outbound",
        "outbound_rules": [
            # Incase you have separate storage accounts for source, observation data and offline store, you need to add multiple rules here. No action needed otherwise.
            {
                "name": "projectsourcerule",
                "destination": {
                    "spark_enabled": "true",
                    "subresource_target": "dfs",
                    "service_resource_id": f"/subscriptions/{storage_subscription_id}/resourcegroups/{storage_resource_group_name}/providers/Microsoft.Storage/storageAccounts/{storage_account_name}",
                },
                "type": "private_endpoint",
            },
            # Rule to create private endpoint to default storage of feature store
            {
                "name": "defaultfsstoragerule",
                "destination": {
                    "spark_enabled": "true",
                    "subresource_target": "blob",
                    "service_resource_id": f"/subscriptions/{featurestore_subscription_id}/resourcegroups/{featurestore_resource_group_name}/providers/Microsoft.Storage/storageAccounts/{default_fs_storage_account_name}",
                },
                "type": "private_endpoint",
            },
            # Rule to create private endpoint to default key vault of feature store
            {
                "name": "defaultfskeyvaultrule",
                "destination": {
                    "spark_enabled": "true",
                    "subresource_target": "vault",
                    "service_resource_id": f"/subscriptions/{featurestore_subscription_id}/resourcegroups/{featurestore_resource_group_name}/providers/Microsoft.Keyvault/vaults/{default_key_vault_name}",
                },
                "type": "private_endpoint",
            },
            # Rule to create private endpoint to feature store
            {
                "name": "featurestorerule",
                "destination": {
                    "spark_enabled": "true",
                    "subresource_target": "amlworkspace",
                    "service_resource_id": f"/subscriptions/{featurestore_subscription_id}/resourcegroups/{featurestore_resource_group_name}/providers/Microsoft.MachineLearningServices/workspaces/{featurestore_name}",
                },
                "type": "private_endpoint",
            },
        ],
    }
}

project_ws_managed_vnet_yaml = (
    root_dir + "/featurestore/project_ws_managed_vnet_config.yaml"
)

with open(project_ws_managed_vnet_yaml, "w") as outfile:
    yaml.dump(config, outfile, default_flow_style=False)

此代码单元使用生成的 YAML 规范文件以及出站规则更新项目工作区。

#### Update project workspace to create private endpoints for the defined outbound rules (it may take approximately 15 minutes)
!az ml workspace update --file $project_ws_managed_vnet_yaml --name $project_ws_name --resource-group $project_ws_rg

此代码单元确认创建由出站规则定义的专用终结点。

!az ml workspace show --name $project_ws_name --resource-group $project_ws_rg

还可以通过导航到项目工作区左侧导航面板中的网络,然后打开工作区托管的出站访问选项卡来验证 Azure 门户中的出站规则。

此屏幕截图显示 Azure 门户中项目工作区的出站规则。

原型并开发事务滚动聚合功能集

探索事务源数据

注意

可公开访问的 Blob 容器托管本教程中使用的示例数据。 它只能通过 wasbs 驱动程序在 Spark 中读取。 使用自己的源数据创建功能集时,请在 ADLS Gen2 帐户中托管这些功能集,并在数据路径中使用 abfss 驱动程序。

# remove the "." in the root directory path as we need to generate absolute path to read from Spark
transactions_source_data_path = f"abfss://{storage_file_system_name_source_data}@{storage_account_name}.dfs.core.windows.net/transactions-source/*.parquet"
transactions_src_df = spark.read.parquet(transactions_source_data_path)

display(transactions_src_df.head(5))
# Note: display(training_df.head(5)) displays the timestamp column in a different format. You can can call transactions_src_df.show() to see correctly formatted value

在本地开发事务功能集

功能集规范是可在本地开发和测试的自包含功能集定义。

创建以下滚动窗口聚合功能:

  • 事务三天计数
  • 事务量三天总和
  • 事务量三天平均值
  • 事务七天计数
  • 事务量七天总和
  • 事务量七天平均值

检查功能转换代码文件 featurestore/featuresets/transactions/spec/transformation_code/transaction_transform.py。 此 spark 转换器执行为特征定义的滚动聚合。

有关功能集和转换的详细信息,请访问“功能存储概念”。

from azureml.featurestore import create_feature_set_spec, FeatureSetSpec
from azureml.featurestore.contracts import (
    DateTimeOffset,
    FeatureSource,
    TransformationCode,
    Column,
    ColumnType,
    SourceType,
    TimestampColumn,
)


transactions_featureset_code_path = (
    root_dir + "/featurestore/featuresets/transactions/transformation_code"
)

transactions_featureset_spec = create_feature_set_spec(
    source=FeatureSource(
        type=SourceType.parquet,
        path=f"abfss://{storage_file_system_name_source_data}@{storage_account_name}.dfs.core.windows.net/transactions-source/*.parquet",
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
    ),
    transformation_code=TransformationCode(
        path=transactions_featureset_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)
# Generate a spark dataframe from the feature set specification
transactions_fset_df = transactions_featureset_spec.to_spark_dataframe()
# display few records
display(transactions_fset_df.head(5))

导出功能集规范

若要向功能存储注册功能集规范,必须以特定格式保存该规范。

若要检查生成的事务功能集规范,请从文件树打开此文件以查看规范:

featurestore/featuresets/accounts/spec/FeaturesetSpec.yaml

该规范包含以下元素:

  • source:对存储资源的引用 - 在本例中,blob 存储资源中的 parquet 文件
  • features:功能及其数据类型的列表。 如果你提供转换代码
  • index_columns:访问功能集中的值所需的联接键

作为将功能集规范保留为 YAML 文件的另一个好处,可以控制规范的版本控制。 详细了解顶级功能存储实体实体文档功能集规范 YAML 参考

import os

# create a new folder to dump the feature set spec
transactions_featureset_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions/spec"
)

# check if the folder exists, create one if not
if not os.path.exists(transactions_featureset_spec_folder):
    os.makedirs(transactions_featureset_spec_folder)

transactions_featureset_spec.dump(transactions_featureset_spec_folder)

注册功能商店实体

实体有助于跨使用相同逻辑实体的功能集强制使用相同的联接键定义。 实体示例可能包括帐户实体、客户实体等。实体通常创建一次,然后跨功能集重复使用。 有关详细信息,请访问顶级功能存储实体文档

此代码单元为功能存储创建帐户实体。

account_entity_path = root_dir + "/featurestore/entities/account.yaml"
!az ml feature-store-entity create --file $account_entity_path --resource-group $featurestore_resource_group_name --workspace-name $featurestore_name

向功能存储注册事务功能集,并提交具体化作业

若要共享和重用功能集资产,必须先将该资产注册到功能存储中。 功能集资产注册提供托管功能,包括版本控制与具体化。 本教程系列介绍了这些主题。

功能集资产引用之前创建的功能集规格,例如版本和具体化设置等其他属性。

创建特征集

下一个代码单元使用预定义的 YAML 规范文件来创建功能集。

transactions_featureset_path = (
    root_dir
    + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled.yaml"
)
!az ml feature-set create --file $transactions_featureset_path --resource-group $featurestore_resource_group_name --workspace-name $featurestore_name

此代码单元预览新创建的功能集。

# Preview the newly created feature set

!az ml feature-set show --resource-group $featurestore_resource_group_name --workspace-name $featurestore_name -n transactions -v 1

提交回填具体化作业

下一个代码单元定义特征具体化窗口的开始和结束时间值,并提交回填具体化作业。


此代码单元提供 <JOB_ID_FROM_PREVIOUS_COMMAND>,用于检查回填具体化作业的状态。


此代码单元列出当前功能集的所有具体化作业。


将 Azure Cache for Redis 附加为联机存储

创建用于 Redis 的 Azure 缓存

在下一个代码单元中,定义要创建或重复使用的 Azure Cache for Redis 的名称。 (可选)可以替代其他默认设置。

redis_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
redis_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
redis_name = "my-redis"
redis_location = storage_location

可以选择 Redis 缓存层(基本层、标准层或高级层)。 应选择可用于所选缓存层的 SKU 系列。 有关选择不同层如何影响缓存性能的详细信息,请访问此文档资源。 有关 Azure Cache for Redis 的不同 SKU 层和系列定价的详细信息,请访问此文档资源

执行以下代码单元以创建具有高级层、SKU 系列 P 和缓存容量 2 的 Azure Cache for Redis。 预配 Redis 实例可能需要大约 5-10 分钟。

# Create new redis cache
from azure.mgmt.redis import RedisManagementClient
from azure.mgmt.redis.models import RedisCreateParameters, Sku, SkuFamily, SkuName

management_client = RedisManagementClient(
    AzureMLOnBehalfOfCredential(), redis_subscription_id
)

# It usually takes about 5 - 10 min to finish the provision of the Redis instance.
# If the following begin_create() call still hangs for longer than that,
# please check the status of the Redis instance on the Azure portal and cancel the cell if the provision has completed.
# This sample uses a PREMIUM tier Redis SKU from family P, which may cost more than a STANDARD tier SKU from family C.
# Please choose the SKU tier and family according to your performance and pricing requirements.

redis_arm_id = (
    management_client.redis.begin_create(
	       resource_group_name=redis_resource_group_name,
	       name=redis_name,
	       parameters=RedisCreateParameters(
	           location=redis_location,
	           sku=Sku(name=SkuName.PREMIUM, family=SkuFamily.P, capacity=2),
	           public_network_access="Disabled",  # can only disable PNA to redis cache during creation
	       ),
	   )
	   .result()
	   .id
)
print(redis_arm_id)

使用联机存储更新功能存储

将 Azure Cache for Redis 附加到功能存储,以将其用作联机具体化存储。 下一个代码单元创建一个 YAML 规范文件,其中包含为功能存储定义的联机存储出站规则。

# The following code cell creates a YAML specification file for outbound rules that are defined for the feature store.
## rule 1: PE to online store (redis cache): this is optional if online store is not used

import yaml

config = {
    "public_network_access": "disabled",
    "managed_network": {
        "isolation_mode": "allow_internet_outbound",
        "outbound_rules": [
            {
                "name": "sourceruleredis",
                "destination": {
                    "spark_enabled": "true",
                    "subresource_target": "redisCache",
                    "service_resource_id": f"/subscriptions/{storage_subscription_id}/resourcegroups/{storage_resource_group_name}/providers/Microsoft.Cache/Redis/{redis_name}",
                },
                "type": "private_endpoint",
            },
        ],
    },
    "online_store": {"target": f"{redis_arm_id}", "type": "redis"},
}

feature_store_managed_vnet_yaml = (
    root_dir + "/featurestore/feature_store_managed_vnet_config.yaml"
)

with open(feature_store_managed_vnet_yaml, "w") as outfile:
    yaml.dump(config, outfile, default_flow_style=False)

下一个代码单元使用生成的 YAML 规范文件更新功能存储,其中包含联机存储的出站规则。

!az ml feature-store update --file $feature_store_managed_vnet_yaml --name $featurestore_name --resource-group $featurestore_resource_group_name

更新项目工作区出站规则

项目工作区需要访问联机存储。 以下代码单元创建一个 YAML 规范文件,其中包含项目工作区所需的出站规则。

import yaml

config = {
    "managed_network": {
        "isolation_mode": "allow_internet_outbound",
        "outbound_rules": [
            {
                "name": "onlineruleredis",
                "destination": {
                    "spark_enabled": "true",
                    "subresource_target": "redisCache",
                    "service_resource_id": f"/subscriptions/{storage_subscription_id}/resourcegroups/{storage_resource_group_name}/providers/Microsoft.Cache/Redis/{redis_name}",
                },
                "type": "private_endpoint",
            },
        ],
    }
}

project_ws_managed_vnet_yaml = (
    root_dir + "/featurestore/project_ws_managed_vnet_config.yaml"
)

with open(project_ws_managed_vnet_yaml, "w") as outfile:
    yaml.dump(config, outfile, default_flow_style=False)

执行下一个代码单元,使用生成的 YAML 规范文件更新项目工作区,其中包含联机存储的出站规则。

#### Update project workspace to create private endpoints for the defined outbound rules (it may take approximately 15 minutes)
!az ml workspace update --file $project_ws_managed_vnet_yaml --name $project_ws_name --resource-group $project_ws_rg

具体化交易功能设置为联机存储

下一个代码单元为 transactions 功能集启用联机具体化。

# Update featureset to enable online materialization
transactions_featureset_path = (
    root_dir
    + "/featurestore/featuresets/transactions/featureset_asset_online_enabled.yaml"
)
!az ml feature-set update --file $transactions_featureset_path --resource-group $featurestore_resource_group_name --workspace-name $featurestore_name

下一个代码单元定义特征具体化窗口的开始和结束时间,并提交回填具体化作业。

feature_window_start_time = "2024-01-24T00:00.000Z"
feature_window_end_time = "2024-01-25T00:00.000Z"

!az ml feature-set backfill --name transactions --version 1 --by-data-status "['None']" --feature-window-start-time $feature_window_start_time --feature-window-end-time $feature_window_end_time --feature-store-name $featurestore_name --resource-group $featurestore_resource_group_name

使用注册的功能生成训练数据

加载观察数据

首先,探索观察数据。 用于训练和推理的核心数据通常涉及观察数据。 该核心数据与特征数据联接,以创建完整的训练数据资源。 观察数据是在事件发生期间捕获的数据。 在这种情况下,它具有核心事务数据,包括事务 ID、帐户 ID 和事务金额值。 在这里,由于观测数据用于训练,因此它还追加了目标变量(is_fraud)。

observation_data_path = f"abfss://{storage_file_system_name_observation_data}@{storage_account_name}.dfs.core.windows.net/train/*.parquet"
observation_data_df = spark.read.parquet(observation_data_path)
obs_data_timestamp_column = "timestamp"

display(observation_data_df)
# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value

获取已注册的功能集并列出其功能

接下来,通过提供其名称和版本来获取功能集,然后列出此功能集中的功能。 此外,打印一些示例特征值。

# look up the featureset by providing name and version
transactions_featureset = featurestore.feature_sets.get("transactions", "1")
# list its features
transactions_featureset.features
# print sample values
display(transactions_featureset.to_spark_dataframe().head(5))

选择功能并生成训练数据

选择训练数据的功能,并使用功能存储 SDK 生成训练数据。

from azureml.featurestore import get_offline_features

# you can select features in pythonic way
features = [
    transactions_featureset.get_feature("transaction_amount_7d_sum"),
    transactions_featureset.get_feature("transaction_amount_7d_avg"),
]

# you can also specify features in string form: featurestore:featureset:version:feature
more_features = [
    "transactions:1:transaction_3d_count",
    "transactions:1:transaction_amount_3d_avg",
]

more_features = featurestore.resolve_feature_uri(more_features)
features.extend(more_features)

# generate training dataframe by using feature data and observation data
training_df = get_offline_features(
    features=features,
    observation_data=observation_data_df,
    timestamp_column=obs_data_timestamp_column,
)

# Ignore the message that says feature set is not materialized (materialization is optional). We will enable materialization in the next part of the tutorial.
display(training_df)
# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value

时间点联接将功能追加到训练数据。

可选的后续步骤

现在,你已成功创建安全功能存储并提交了成功的具体化运行,接下来可以浏览教程系列来构建对功能存储的理解。

本教程包含本系列教程 1 和 2 中的步骤组合。 请记住,将其他教程笔记本中使用的必要公共存储容器替换为在本教程笔记本中创建的容器,以便进行网络隔离。

本教程到此结束。 训练数据使用功能存储中的功能。 可以将其保存到存储中供以后使用,也可以直接在其上运行模型训练。

后续步骤