如何在 Azure Cosmos DB 中编写存储过程、触发器和用户定义的函数

适用范围: NoSQL

Azure Cosmos DB 提供 JavaScript 的语言集成式事务执行用于编写存储过程触发器用户定义的函数 (UDF) 。 在 Azure Cosmos DB 中使用 API for NoSQL 时,可以采用 JavaScript 来定义存储过程、触发器和 UDF。 可在 JavaScript 中编写逻辑,并在数据库引擎内部执行该逻辑。 可以使用 Azure 门户Azure Cosmos DB 中的 JavaScript 查询 APIAzure Cosmos DB for NoSQL SDK 来创建及执行触发器、存储过程和 UDF。

要调用存储过程、触发器或 UDF,需将其注册。 有关详细信息,请参阅如何在 Azure Cosmos DB 中使用存储过程、触发器和用户定义的函数

注意

对于已分区的容器,在执行存储过程时,必须在请求选项中提供分区键值。 存储过程的范围始终限定为分区键。 存储过程看不到具有不同分区键值的项。 这同样适用于触发器。

注意

服务器端 JavaScript 功能(包括存储过程、触发器和 UDF)不支持导入模块。

提示

Azure Cosmos DB 支持使用存储过程、触发器和 UDF 部署容器。 有关详细信息,请参阅使用服务器端功能创建 Azure Cosmos DB 容器

如何编写存储过程

存储过程是使用 JavaScript 编写的,它们可以在 Azure Cosmos DB 容器中创建、更新、读取、查询和删除项。 存储过程按集合注册,可以针对该集合中的任何文档或附件运行。

注意

Azure Cosmos DB 对存储过程有不同的收费策略。 由于存储过程可以执行代码并使用任意数量的请求单位 (RU),因此每次执行都需要预先收取费用。 这可确保存储过程脚本不会影响后端服务。 预先收取的金额是脚本在以前的调用中产生的平均费用。 在执行前保留每个操作的平均 RU 数。 如果调用的 RU 数差异很大,预算利用率可能会受到影响。 作为替代方法,应使用批处理或批量请求而不是存储过程,以避免产生 RU 收费差异。

下面是一个可以返回“Hello World”响应的简单存储过程。

var helloWorldStoredProc = {
    id: "helloWorld",
    serverScript: function () {
        var context = getContext();
        var response = context.getResponse();

        response.setBody("Hello, World");
    }
}

上下文对象提供对所有可在 Azure Cosmos DB 中执行的操作的访问,以及对请求和响应对象的访问。 在本例中,我们将使用响应对象来设置要发回到客户端的响应正文。

编写存储过程后,必须将其注册到集合。 有关详细信息,请参阅如何在 Azure Cosmos DB 中使用存储过程

使用存储过程创建项

使用存储过程创建某个项时,该项会插入到 Azure Cosmos DB 容器,并返回新建项的 ID。 创建项是一种异步操作,依赖于 JavaScript 回调函数。 回调函数包含两个参数:一个参数用于操作失败时返回的错误对象,另一个参数用于返回值(在本例中为创建的对象)。 在回调内部,可以处理异常或引发错误。 如果未提供回调而又存在错误,则 Azure Cosmos DB 运行时会引发错误。

存储过程还包含一个用于设置说明的参数(一个布尔值)。 如果该参数设置为 true,同时缺少说明,则存储过程会引发异常。 否则,存储过程的剩余部分将继续运行。

以下示例存储过程采用新 Azure Cosmos DB 项的数组作为输入,将其插入 Azure Cosmos DB 容器,然后返回插入项的计数。 此示例利用 .NET API for NoSQL 快速入门中的 ToDoList 示例。

function createToDoItems(items) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();
    var count = 0;

    if (!items) throw new Error("The array is undefined or null.");

    var numItems = items.length;

    if (numItems == 0) {
        getContext().getResponse().setBody(0);
        return;
    }

    tryCreate(items[count], callback);

    function tryCreate(item, callback) {
        var options = { disableAutomaticIdGeneration: false };

        var isAccepted = collection.createDocument(collectionLink, item, options, callback);

        if (!isAccepted) getContext().getResponse().setBody(count);
    }

    function callback(err, item, options) {
        if (err) throw err;
        count++;
        if (count >= numItems) {
            getContext().getResponse().setBody(count);
        } else {
            tryCreate(items[count], callback);
        }
    }
}

将数组用作存储过程的输入参数

在 Azure 门户中定义存储过程时,输入参数始终以字符串的形式发送到存储过程。 即使将字符串数组作为输入传递,该数组也会转换为字符串发送到存储过程。 若要解决此问题,可在存储过程中定义一个函数以将字符串作为数组进行分析。 下面的代码演示如何将字符串输入参数解析为数组:

function sample(arr) {
    if (typeof arr === "string") arr = JSON.parse(arr);

    arr.forEach(function(a) {
        // do something here
        console.log(a);
    });
}

存储过程中的事务

可以使用存储过程对容器中的项实现事务。 以下示例使用梦幻足球游戏应用中的事务,通过单个操作在两支球队之间交易球员。 该存储过程尝试读取两个 Azure Cosmos DB 项,其中每个项对应于作为参数传递的球员 ID。 如果找到了这两个球员,则存储过程将通过交换其所在球队来更新这些项。 如果在此过程中遇到了任何错误,则存储过程将引发 JavaScript 异常,从而隐式终止事务。

function tradePlayers(playerId1, playerId2) {
    var context = getContext();
    var container = context.getCollection();
    var response = context.getResponse();

    var player1Item, player2Item;

    // query for players
    var filterQuery =
    {
        'query' : 'SELECT * FROM Players p where p.id = @playerId1',
        'parameters' : [{'name':'@playerId1', 'value':playerId1}] 
    };

    var accept = container.queryDocuments(container.getSelfLink(), filterQuery, {},
        function (err, items, responseOptions) {
            if (err) throw new Error("Error" + err.message);

            if (items.length != 1) throw "Unable to find player 1";
            player1Item = items[0];

            var filterQuery2 =
            {
                'query' : 'SELECT * FROM Players p where p.id = @playerId2',
                'parameters' : [{'name':'@playerId2', 'value':playerId2}]
            };
            var accept2 = container.queryDocuments(container.getSelfLink(), filterQuery2, {},
                function (err2, items2, responseOptions2) {
                    if (err2) throw new Error("Error " + err2.message);
                    if (items2.length != 1) throw "Unable to find player 2";
                    player2Item = items2[0];
                    swapTeams(player1Item, player2Item);
                    return;
                });
            if (!accept2) throw "Unable to read player details, abort ";
        });

    if (!accept) throw "Unable to read player details, abort ";

    // swap the two players� teams
    function swapTeams(player1, player2) {
        var player2NewTeam = player1.team;
        player1.team = player2.team;
        player2.team = player2NewTeam;

        var accept = container.replaceDocument(player1._self, player1,
            function (err, itemReplaced) {
                if (err) throw "Unable to update player 1, abort ";

                var accept2 = container.replaceDocument(player2._self, player2,
                    function (err2, itemReplaced2) {
                        if (err) throw "Unable to update player 2, abort"
                    });

                if (!accept2) throw "Unable to update player 2, abort";
            });

        if (!accept) throw "Unable to update player 1, abort";
    }
}

存储过程中的绑定执行

下面是将项批量导入到 Azure Cosmos DB 容器的存储过程示例。 存储过程通过检查来自 createDocument 的布尔返回值来处理绑定执行,然后使用每次调用存储过程时插入的项计数,来跟踪不同的批及恢复其进度。

function bulkImport(items) {
    var container = getContext().getCollection();
    var containerLink = container.getSelfLink();

    // The count of imported items, also used as the current item index.
    var count = 0;

    // Validate input.
    if (!items) throw new Error("The array is undefined or null.");

    var itemsLength = items.length;
    if (itemsLength == 0) {
        getContext().getResponse().setBody(0);
    }

    // Call the create API to create an item.
    tryCreate(items[count], callback);

    // Note that there are 2 exit conditions:
    // 1) The createDocument request was not accepted.
    //    In this case the callback will not be called, we just call setBody and we are done.
    // 2) The callback was called items.length times.
    //    In this case all items were created and we don�t need to call tryCreate anymore. Just call setBody and we are done.
    function tryCreate(item, callback) {
        var isAccepted = container.createDocument(containerLink, item, callback);

        // If the request was accepted, the callback will be called.
        // Otherwise report the current count back to the client,
        // which will call the script again with the remaining set of items.
        if (!isAccepted) getContext().getResponse().setBody(count);
    }

    // This is called when container.createDocument is done in order to process the result.
    function callback(err, item, options) {
        if (err) throw err;

        // One more item has been inserted, increment the count.
        count++;

        if (count >= itemsLength) {
            // If we created all items, we are done. Just set the response.
            getContext().getResponse().setBody(count);
        } else {
            // Create the next document.
            tryCreate(items[count], callback);
        }
    }
}

使用存储过程的 async/await

下面的存储过程示例使用 helper 函数将 async/await 与 Promises 结合使用。 存储过程会查询项并将其替换。

function async_sample() {
    const ERROR_CODE = {
        NotAccepted: 429
    };

    const asyncHelper = {
        queryDocuments(sqlQuery, options) {
            return new Promise((resolve, reject) => {
                const isAccepted = __.queryDocuments(__.getSelfLink(), sqlQuery, options, (err, feed, options) => {
                    if (err) reject(err);
                    resolve({ feed, options });
                });
                if (!isAccepted) reject(new Error(ERROR_CODE.NotAccepted, "queryDocuments was not accepted."));
            });
        },

        replaceDocument(doc) {
            return new Promise((resolve, reject) => {
                const isAccepted = __.replaceDocument(doc._self, doc, (err, result, options) => {
                    if (err) reject(err);
                    resolve({ result, options });
                });
                if (!isAccepted) reject(new Error(ERROR_CODE.NotAccepted, "replaceDocument was not accepted."));
            });
        }
    };

    async function main() {
        let continuation;
        do {
            let { feed, options } = await asyncHelper.queryDocuments("SELECT * from c", { continuation });

            for (let doc of feed) {
                doc.newProp = 1;
                await asyncHelper.replaceDocument(doc);
            }

            continuation = options.continuation;
        } while (continuation);
    }

    main().catch(err => getContext().abort(err));
}

如何编写触发器

Azure Cosmos DB 支持前触发器和后触发器。 前触发器是在修改数据库项之前执行的,后触发器是在修改数据库项之后执行的。 触发器不会自动执行。 必须为要执行触发器的每个数据库操作指定它们。 定义触发器后,应使用 Azure Cosmos DB SDK 注册和调用前触发器

前触发器

以下示例演示如何使用前触发器来验证正在创建的 Azure Cosmos DB 项的属性。 此示例利用 .NET API for NoSQL 快速入门中的 ToDoList 示例,将时间戳属性添加到新添加的项(如果其中不包含此属性)。

function validateToDoItemTimestamp() {
    var context = getContext();
    var request = context.getRequest();

    // item to be created in the current operation
    var itemToCreate = request.getBody();

    // validate properties
    if (!("timestamp" in itemToCreate)) {
        var ts = new Date();
        itemToCreate["timestamp"] = ts.getTime();
    }

    // update the item that will be created
    request.setBody(itemToCreate);
}

前预触发器不能有任何输入参数。 使用触发器中的请求对象来处理与操作关联的请求消息。 在前面的示例中,创建 Azure Cosmos DB 项时将运行前触发器,请求消息正文包含要以 JSON 格式创建的项。

注册触发器后,可以指定可对哪些操作运行该触发器。 应使用 TriggerOperationTriggerOperation.Create 值创建此触发器,这意味着,不允许在 replace 操作中使用此触发器。

有关如何注册和调用前触发器的示例,请参阅前触发器后触发器

后触发器

以下示例演示了一个后触发器。 此触发器查询元数据项,并在其中更新有关新建项的详细信息。

function updateMetadata() {
    var context = getContext();
    var container = context.getCollection();
    var response = context.getResponse();

    // item that was created
    var createdItem = response.getBody();

    // query for metadata document
    var filterQuery = 'SELECT * FROM root r WHERE r.id = "_metadata"';
    var accept = container.queryDocuments(container.getSelfLink(), filterQuery,
        updateMetadataCallback);
    if(!accept) throw "Unable to update metadata, abort";

    function updateMetadataCallback(err, items, responseOptions) {
        if(err) throw new Error("Error" + err.message);

        if(items.length != 1) throw 'Unable to find metadata document';

        var metadataItem = items[0];

        // update metadata
        metadataItem.createdItems += 1;
        metadataItem.createdNames += " " + createdItem.id;
        var accept = container.replaceDocument(metadataItem._self,
            metadataItem, function(err, itemReplaced) {
                    if(err) throw "Unable to update metadata, abort";
            });

        if(!accept) throw "Unable to update metadata, abort";
        return;
    }
}

必须注意的一个要点是 Azure Cosmos DB 中触发器的事务执行。 后触发器作为基础项本身的同一事务的一部分运行。 后触发器执行期间的异常会导致整个事务失败。 提交的任何内容都会回退并返回异常。

有关如何注册和调用前触发器的示例,请参阅前触发器后触发器

如何编写用户定义的函数

以下示例创建一个 UDF 用于计算各个收入阶层的所得税。 然后,在查询中使用此用户定义的函数。 此示例假设有一个名为“Incomes”的容器,其中包含以下属性:

{
   "name": "Daniel Elfyn",
   "country": "USA",
   "income": 70000
}

下面的函数定义用于计算各个收入阶层的所得税:

function tax(income) {
    if (income == undefined)
        throw 'no input';

    if (income < 1000)
        return income * 0.1;
    else if (income < 10000)
        return income * 0.2;
    else
        return income * 0.4;
}

有关如何注册和使用 UDF 的示例,请参阅如何在 Azure Cosmos DB 中使用用户定义的函数

日志记录

使用存储过程、触发器或 UDF 时,可启用脚本日志记录来记录这些步骤。 当 EnableScriptLogging 设置为 true 时,会生成用于调试的字符串,如以下示例所示:

let requestOptions = { enableScriptLogging: true };
const { resource: result, headers: responseHeaders} = await container.scripts
      .storedProcedure(Sproc.id)
      .execute(undefined, [], requestOptions);
console.log(responseHeaders[Constants.HttpHeaders.ScriptLogResults]);

后续步骤

详细了解概念以及如何在 Azure Cosmos DB 中编写或使用存储过程、触发器和 UDF: