Python 插件

Python 插件使用 Python 脚本运行用户定义函数 (UDF)。 此 Python 脚本获取表格数据作为其输入,并生成表格输出。 插件的运行时托管在沙盒中,运行在群集的节点上。

语法

T|evaluate [hint.distribution= (single | per_node)] [hint.remote= (auto | local)] python(output_schema,script [,script_parameters] [,external_artifacts][,spill_to_disk])

详细了解语法约定

参数

客户 类型​​ 必需 说明
output_schema string type 文本,定义由 Python 代码返回的表格数据的输出模式。 格式为:typeof(ColumnName:ColumnType[, ...])。例如,typeof(col1:string, col2:long)。 若要扩展输入架构,请使用以下语法:typeof(*, col1:string, col2:long)
脚本 string 要执行的有效 Python 脚本。 若要生成多行字符串,请参阅用法提示
script_parameters dynamic 一个名称/值对的属性包,将作为保留的 kargs 字典传递给 Python 脚本。 有关详细信息,请参阅保留 Python 变量
hint.distribution string 一个提示,用于在多个群集节点上分布插件的执行。 默认值为 singlesingle 意味着脚本的单个实例将针对整个查询数据运行。 per_node 意味着,如果在分发 Python 块之前执行查询,则脚本的一个实例将在每个节点上对其包含的数据运行。
hint.remote string 此提示仅适用于跨群集查询。 默认值为 autoauto 意味着服务器会自动决定执行 Python 代码的群集。 将值设置为 local 会强制在本地群集上执行 Python 代码。 在远程群集上禁用了 Python 插件时使用它。
external_artifacts dynamic 名称和 URL 对的属性包,用于可从云存储访问的项目。 有关详细信息,请参阅使用外部项目
spill_to_disk bool 指定一个用于将输入表序列化为 Python 沙盒的替代方法。 若要序列化大表,请将其设置为 true 以加速序列化,并大大减少沙盒内存消耗。 默认值为 true

保留 Python 变量

以下变量是为 Kusto 查询语言和 Python 代码之间的交互而保留的。

  • df:作为 pandas 数据帧的输入表格数据(上述 T 的值)。
  • kargs:作为 Python 字典的 script_parameters 参数的值。
  • result:由 Python 脚本创建的 pandas 数据帧,其值成为发送到插件后面的 Kusto 查询操作符的表格数据。

启用插件

默认禁用该插件。 在开始之前,请查看先决条件列表。 若要启用插件并选择 Python 映像的版本,请参阅在群集上启用语言扩展

Python 沙盒映像

若要更改 Python 映像的版本,请参阅更改群集上的 Python 语言扩展映像

若要查看不同 Python 映像的包列表,请参阅 Python 包参考

注意

  • 默认情况下,插件将 numpy 作为 np 导入,将 pandas 作为 pd 导入。 (可选)可以根据需要导入其他模块。
  • 有一些包可能与运行插件的沙盒所实施的限制不兼容。

使用来自查询和更新策略的引入

  • 在以下查询中使用插件:
    • 定义为更新策略的一部分,其源表引入为使用非流式处理引入。
    • 作为从查询引入(例如 .set-or-append)的命令的一部分运行。
  • 不能在定义为更新策略一部分的查询中使用插件,该策略的源表是使用流式处理引入引入的。

示例

range x from 1 to 360 step 1
| evaluate python(
//
typeof(*, fx:double),               //  Output schema: append a new fx column to original table 
```
result = df
n = df.shape[0]
g = kargs["gain"]
f = kargs["cycles"]
result["fx"] = g * np.sin(df["x"]/n*2*np.pi*f)
```
, bag_pack('gain', 100, 'cycles', 4)    //  dictionary of parameters
)
| render linechart 

Screenshot of sine demo showing query result.

print "This is an example for using 'external_artifacts'"
| evaluate python(
    typeof(File:string, Size:string), ```if 1:
    import os
    result = pd.DataFrame(columns=['File','Size'])
    sizes = []
    path = '.\\\\Temp'
    files = os.listdir(path)
    result['File']=files
    for file in files:
        sizes.append(os.path.getsize(path + '\\\\' + file))
    result['Size'] = sizes
    ```,
    external_artifacts = 
        dynamic({"this_is_my_first_file":"https://kustoscriptsamples.blob.core.chinacloudapi.cn/samples/R/sample_script.r",
                 "this_is_a_script":"https://kustoscriptsamples.blob.core.chinacloudapi.cn/samples/python/sample_script.py"})
)
文件 大小
this_is_a_script 120
this_is_my_first_file 105

性能提示

  • 将插件的输入数据集减少到所需的最小量(列/行)。
    • 在可能的情况下,通过 Kusto 的查询语言对源数据集使用筛选器。
    • 若要对源列的子集进行计算,请在调用插件之前仅投影这些列。
  • 如果脚本中的逻辑可分发,请使用 hint.distribution = per_node
  • 尽可能使用 Kusto 的查询语言来实现 Python 脚本的逻辑。

使用提示

  • 若要在查询编辑器中生成包含 Python 脚本的多行字符串,请从常用的 Python 编辑器(JupyterVisual Studio CodePyCharm 等)复制 Python 脚本,将其粘贴到查询编辑器中,然后在包含三个连续反引号的行之间括起完整的脚本。 例如:

    ```
    python code
    ```

  • 使用 externaldata 运算符获取存储在外部位置(例如 Azure Blob 存储)中的脚本内容。

示例

    let script = 
        externaldata(script:string)
        [h'https://kustoscriptsamples.blob.core.chinacloudapi.cn/samples/python/sample_script.py']
        with(format = raw);
    range x from 1 to 360 step 1
    | evaluate python(
        typeof(*, fx:double),
        toscalar(script), 
        bag_pack('gain', 100, 'cycles', 4))
    | render linechart 

使用外部项目

云存储中的外部项目可用于脚本并在运行时使用。

外部项目属性引用的 URL 必须是:

注意

使用托管标识对外部项目进行身份验证时,必须在群集级别托管标识策略上定义 SandboxArtifacts 用法。

这些项目可供脚本从本地临时目录 .\Temp 使用。 属性包中提供的名称用作本地文件名。 请参阅示例

有关引用外部包的信息,请参阅安装 Python 插件的包

刷新外部项目缓存

查询中使用的外部项目文件缓存在群集上。 如果对云存储中的文件进行更新,并要求立即与群集同步,可以使用 .clear cluster cache external-artifacts 命令。 此命令会清除缓存的文件,并确保后续查询使用最新版本的项目运行。

安装 Python 插件的包

由于以下原因,你可能需要自行安装包:

  • 包是专用的,是你自己的。
  • 包是公共的,但不包括在插件的基本映像中。

安装包,如下所示:

先决条件

  1. 创建 blob 容器来承载包,最好与群集位于同一个位置。 例如 https://artifactschinaeast2.blob.core.chinacloudapi.cn/python,假设群集位于中国东部 2。

  2. 更改群集的标注策略以允许访问该位置。

    • 此更改需要 AllDatabasesAdmin 权限。

    • 例如,若要启用对位于 https://artifactschinaeast2.blob.core.chinacloudapi.cn/python 中的 blob 的访问,请运行以下命令:

    .alter-merge cluster policy callout @'[ { "CalloutType": "sandbox_artifacts", "CalloutUriRegex": "artifactschinaeast2\\.blob\\.core\\.chinacloudapi\\.cn/python/","CanCall": true } ]'
    

安装包

  1. 对于 PyPi 或其他通道中的公共包,请下载包及其依赖项。

    • 从本地 Windows Python 环境中的 cmd 窗口运行:
    pip wheel [-w download-dir] package-name.
    
  2. 创建一个 zip 文件,其中包含所需的包及其依赖项。

    • 对于专用包:压缩包的文件夹及其依赖项的文件夹。
    • 对于公共包,对在上一步中下载的文件进行压缩。

    注意

    • 确保下载与 Python 引擎和沙盒运行时(当前在 Windows 上为 3.6.5 版本)平台兼容的包
    • 请确保压缩 .whl 文件本身,而不是其父文件夹。
    • 对于基本沙箱映像中已经存在的具有相同版本的包,你可以跳过 .whl 文件。
  3. 将压缩文件上传到项目位置中的 blob(从步骤 1 开始)。

  4. 调用 python 插件。

    • 使用一个包含名称和对 zip 文件(blob 的 URL,包括一个 SAS 令牌)的引用的属性包指定 external_artifacts 参数。
    • 在内联 python 代码中,从 sandbox_utils 导入 Zipackage,并使用 zip 文件的名称调用其 install() 方法。

示例

安装生成伪数据的 Faker 包。

range ID from 1 to 3 step 1 
| extend Name=''
| evaluate python(typeof(*), ```if 1:
    from sandbox_utils import Zipackage
    Zipackage.install("Faker.zip")
    from faker import Faker
    fake = Faker()
    result = df
    for i in range(df.shape[0]):
        result.loc[i, "Name"] = fake.name()
    ```,
    external_artifacts=bag_pack('faker.zip', 'https://artifacts.blob.core.chinacloudapi.cn/kusto/Faker.zip?*** REPLACE WITH YOUR SAS TOKEN ***'))
ID 名称
1 Gary Tapia
2 Emma Evans
3 Ashley Bowen

有关使用 Python 插件的 UDF 函数的更多示例,请参阅函数库

Python 插件使用 Python 脚本运行用户定义函数 (UDF)。 此 Python 脚本获取表格数据作为其输入,并生成表格输出。

语法

T|evaluate [hint.distribution= (single | per_node)] [hint.remote= (auto | local)] python(output_schema,script [,script_parameters] [,spill_to_disk])

详细了解语法约定

参数

客户 类型​​ 必需 说明
output_schema string type 文本,定义由 Python 代码返回的表格数据的输出模式。 格式为:typeof(ColumnName:ColumnType[, ...])。例如,typeof(col1:string, col2:long)。 若要扩展输入架构,请使用以下语法:typeof(*, col1:string, col2:long)
脚本 string 要执行的有效 Python 脚本。 若要生成多行字符串,请参阅用法提示
script_parameters dynamic 一个名称/值对的属性包,将作为保留的 kargs 字典传递给 Python 脚本。 有关详细信息,请参阅保留 Python 变量
hint.distribution string 一个提示,用于在多个群集节点上分布插件的执行。 默认值为 singlesingle 意味着脚本的单个实例将针对整个查询数据运行。 per_node 意味着,如果在分发 Python 块之前执行查询,则脚本的一个实例将在每个节点上对其包含的数据运行。
hint.remote string 此提示仅适用于跨群集查询。 默认值为 autoauto 意味着服务器会自动决定执行 Python 代码的群集。 将值设置为 local 会强制在本地群集上执行 Python 代码。 在远程群集上禁用了 Python 插件时使用它。
spill_to_disk bool 指定一个用于将输入表序列化为 Python 沙盒的替代方法。 若要序列化大表,请将其设置为 true 以加速序列化,并大大减少沙盒内存消耗。 默认值为 true

保留 Python 变量

以下变量是为 Kusto 查询语言和 Python 代码之间的交互而保留的。

  • df:作为 pandas 数据帧的输入表格数据(上述 T 的值)。
  • kargs:作为 Python 字典的 script_parameters 参数的值。
  • result:由 Python 脚本创建的 pandas 数据帧,其值成为发送到插件后面的 Kusto 查询操作符的表格数据。

启用插件

默认禁用该插件。 在开始之前,请在 KQL 数据库中启用 Python 插件。

Python 沙盒映像

若要查看不同 Python 映像的包列表,请参阅 Python 包参考

注意

  • 默认情况下,插件将 numpy 作为 np 导入,将 pandas 作为 pd 导入。 (可选)可以根据需要导入其他模块。
  • 有一些包可能与运行插件的沙盒所实施的限制不兼容。

使用来自查询和更新策略的引入

  • 在以下查询中使用插件:
    • 定义为更新策略的一部分,其源表引入为使用非流式处理引入。
    • 作为从查询引入(例如 .set-or-append)的命令的一部分运行。
  • 不能在定义为更新策略一部分的查询中使用插件,该策略的源表是使用流式处理引入引入的。

示例

range x from 1 to 360 step 1
| evaluate python(
//
typeof(*, fx:double),               //  Output schema: append a new fx column to original table 
```
result = df
n = df.shape[0]
g = kargs["gain"]
f = kargs["cycles"]
result["fx"] = g * np.sin(df["x"]/n*2*np.pi*f)
```
, bag_pack('gain', 100, 'cycles', 4)    //  dictionary of parameters
)
| render linechart 

Screenshot of sine demo showing query result.

性能提示

  • 将插件的输入数据集减少到所需的最小量(列/行)。
    • 在可能的情况下,通过 Kusto 的查询语言对源数据集使用筛选器。
    • 若要对源列的子集进行计算,请在调用插件之前仅投影这些列。
  • 如果脚本中的逻辑可分发,请使用 hint.distribution = per_node
  • 尽可能使用 Kusto 的查询语言来实现 Python 脚本的逻辑。

使用提示

  • 若要在查询编辑器中生成包含 Python 脚本的多行字符串,请从常用的 Python 编辑器(JupyterVisual Studio CodePyCharm 等)复制 Python 脚本,将其粘贴到查询编辑器中,然后在包含三个连续反引号的行之间括起完整的脚本。 例如:

    ```
    python code
    ```

  • 使用 externaldata 运算符获取存储在外部位置(例如 Azure Blob 存储)中的脚本内容。

示例

    let script = 
        externaldata(script:string)
        [h'https://kustoscriptsamples.blob.core.chinacloudapi.cn/samples/python/sample_script.py']
        with(format = raw);
    range x from 1 to 360 step 1
    | evaluate python(
        typeof(*, fx:double),
        toscalar(script), 
        bag_pack('gain', 100, 'cycles', 4))
    | render linechart 

有关使用 Python 插件的 UDF 函数的更多示例,请参阅函数库

不支持此功能。