快速入门:在 Azure Cosmos DB for MongoDB 中使用 Node.js 进行矢量搜索(vCore)

在 Azure Cosmos DB for MongoDB(vCore)中使用矢量搜索和 Node.js 客户端库。 高效存储和查询矢量数据。

本快速入门使用 JSON 文件中的示例酒店数据集,其中包含模型中的 text-embedding-ada-002 矢量。 数据集包括酒店名称、位置、说明和矢量嵌入。

在 GitHub 上查找 示例代码

先决条件

  • Azure 订阅服务

    • 如果没有 Azure 订阅,可在开始前创建一个试用帐户

创建 Node.js 项目

  1. 为项目创建新目录,并在 Visual Studio Code 中打开它:

    mkdir vector-search-quickstart
    code vector-search-quickstart
    
  2. 在终端中,初始化 Node.js 项目:

    npm init -y
    npm pkg set type="module"
    
  3. 安装所需的包:

    npm install mongodb @azure/identity openai @types/node
    
    • mongodb:MongoDB Node.js 驱动程序
    • @azure/identity:用于无密码身份验证的 Azure 标识库
    • openai:用于创建向量的 OpenAI 客户端库
    • @types/node:Node.js 的类型定义
  4. .env在项目根目录中为环境变量创建文件:

    # Azure OpenAI Embedding Settings
    AZURE_OPENAI_EMBEDDING_MODEL=text-embedding-ada-002
    AZURE_OPENAI_EMBEDDING_API_VERSION=2023-05-15
    AZURE_OPENAI_EMBEDDING_ENDPOINT=
    EMBEDDING_SIZE_BATCH=16
    
    # MongoDB configuration
    MONGO_CLUSTER_NAME=
    
    # Data file
    DATA_FILE_WITH_VECTORS=HotelsData_toCosmosDB_Vector.json
    EMBEDDED_FIELD=text_embedding_ada_002
    EMBEDDING_DIMENSIONS=1536
    LOAD_SIZE_BATCH=100
    

    将文件中的 .env 占位符值替换为你自己的信息:

    • AZURE_OPENAI_EMBEDDING_ENDPOINT:Azure OpenAI 资源终结点的 URL
    • MONGO_CLUSTER_NAME:MongoDB vCore 资源名称
  5. 添加文件 tsconfig.json 以配置 TypeScript:

{
    "compilerOptions": {
        "target": "ES2020",
        "module": "NodeNext",
        "moduleResolution": "nodenext",
        "declaration": true,
        "outDir": "./dist",
        "strict": true,
        "esModuleInterop": true,
        "skipLibCheck": true,
        "noImplicitAny": false,
        "forceConsistentCasingInFileNames": true,
        "sourceMap": true,
        "resolveJsonModule": true,
    },
    "include": [
        "src/**/*"
    ],
    "exclude": [
        "node_modules",
        "dist"
    ]
}
  1. HotelsData_toCosmosDB_Vector.json包含矢量的原始数据文件 复制到项目根目录。

创建 npm 脚本

编辑package.json文件,并且添加以下脚本:

使用这些脚本编译 TypeScript 文件并运行 DiskANN 索引实现。

"scripts": { 
    "build": "tsc",
    "start:diskann": "node --env-file .env dist/diskann.js"
}

src为 TypeScript 文件创建目录。 添加两个文件:diskann.tsutils.ts 用于 DiskANN 索引的实现。

mkdir src    
touch src/diskann.ts
touch src/utils.ts

将以下代码粘贴到 diskann.ts 文件中。

import path from 'path';
import { readFileReturnJson, getClientsPasswordless, insertData, printSearchResults } from './utils.js';

// ESM specific features - create __dirname equivalent
import { fileURLToPath } from "node:url";
import { dirname } from "node:path";
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);

const config = {
    query: "quintessential lodging near running trails, eateries, retail",
    dbName: "Hotels",
    collectionName: "hotels_diskann",
    indexName: "vectorIndex_diskann",
    dataFile: process.env.DATA_FILE_WITH_VECTORS!,
    batchSize: parseInt(process.env.LOAD_SIZE_BATCH! || '100', 10),
    embeddedField: process.env.EMBEDDED_FIELD!,
    embeddingDimensions: parseInt(process.env.EMBEDDING_DIMENSIONS!, 10),
    deployment: process.env.AZURE_OPENAI_EMBEDDING_MODEL!,
};

async function main() {

    const { aiClient, dbClient } = getClientsPasswordless();

    try {

        if (!aiClient) {
            throw new Error('AI client is not configured. Please check your environment variables.');
        }
        if (!dbClient) {
            throw new Error('Database client is not configured. Please check your environment variables.');
        }

        await dbClient.connect();
        const db = dbClient.db(config.dbName);
        const collection = await db.createCollection(config.collectionName);
        console.log('Created collection:', config.collectionName);
        const data = await readFileReturnJson(path.join(__dirname, "..", config.dataFile));
        const insertSummary = await insertData(config, collection, data);
        console.log('Created vector index:', config.indexName);
        
        // Create the vector index
        const indexOptions = {
            createIndexes: config.collectionName,
            indexes: [
                {
                    name: config.indexName,
                    key: {
                        [config.embeddedField]: 'cosmosSearch'
                    },
                    cosmosSearchOptions: {
                        kind: 'vector-diskann',
                        dimensions: config.embeddingDimensions,
                        similarity: 'COS', // 'COS', 'L2', 'IP'
                        maxDegree: 20, // 20 - 2048,  edges per node
                        lBuild: 10 // 10 - 500, candidate neighbors evaluated
                    }
                }
            ]
        };
        const vectorIndexSummary = await db.command(indexOptions);

        // Create embedding for the query
        const createEmbeddedForQueryResponse = await aiClient.embeddings.create({
            model: config.deployment,
            input: [config.query]
        });

        // Perform the vector similarity search
        const searchResults = await collection.aggregate([
            {
                $search: {
                    cosmosSearch: {
                        vector: createEmbeddedForQueryResponse.data[0].embedding,
                        path: config.embeddedField,
                        k: 5
                    }
                }
            },
            {
                $project: {
                    score: {
                        $meta: "searchScore"
                    },
                    document: "$$ROOT"
                }
            }
        ]).toArray();

        // Print the results
        printSearchResults(insertSummary, vectorIndexSummary, searchResults);

    } catch (error) {
        console.error('App failed:', error);
        process.exitCode = 1;
    } finally {
        console.log('Closing database connection...');
        if (dbClient) await dbClient.close();
        console.log('Database connection closed');
    }
}

// Execute the main function
main().catch(error => {
    console.error('Unhandled error:', error);
    process.exitCode = 1;
});

此主模块提供以下功能:

  • 包括实用工具函数
  • 为环境变量创建配置对象
  • 为 Azure OpenAI 和 Azure Cosmos DB for MongoDB vCore 创建客户端
  • 连接到 MongoDB、创建数据库和集合、插入数据以及创建标准索引
  • 使用 IVF、HNSW 或 DiskANN 创建矢量索引
  • 使用 OpenAI 客户端为示例查询文本创建嵌入。 可以更改文件顶部的查询
  • 使用嵌入运行矢量搜索并输出结果

创建实用工具函数

将以下代码粘贴到 utils.ts

import { MongoClient, OIDCResponse, OIDCCallbackParams } from 'mongodb';
import { AzureOpenAI } from 'openai/index.js';
import { promises as fs } from "fs";
import { AccessToken, DefaultAzureCredential, TokenCredential, getBearerTokenProvider } from '@azure/identity';

// Define a type for JSON data
export type JsonData = Record<string, any>;

export const AzureIdentityTokenCallback = async (params: OIDCCallbackParams, credential: TokenCredential): Promise<OIDCResponse> => {
    const tokenResponse: AccessToken | null = await credential.getToken(['https://ossrdbms-aad.database.windows.net/.default']);
    return {
        accessToken: tokenResponse?.token || '',
        expiresInSeconds: (tokenResponse?.expiresOnTimestamp || 0) - Math.floor(Date.now() / 1000)
    };
};
export function getClients(): { aiClient: AzureOpenAI; dbClient: MongoClient } {
    const apiKey = process.env.AZURE_OPENAI_EMBEDDING_KEY!;
    const apiVersion = process.env.AZURE_OPENAI_EMBEDDING_API_VERSION!;
    const endpoint = process.env.AZURE_OPENAI_EMBEDDING_ENDPOINT!;
    const deployment = process.env.AZURE_OPENAI_EMBEDDING_MODEL!;
    const aiClient = new AzureOpenAI({
        apiKey,
        apiVersion,
        endpoint,
        deployment
    });
    const dbClient = new MongoClient(process.env.MONGO_CONNECTION_STRING!, {
        // Performance optimizations
        maxPoolSize: 10,         // Limit concurrent connections
        minPoolSize: 1,          // Maintain at least one connection
        maxIdleTimeMS: 30000,    // Close idle connections after 30 seconds
        connectTimeoutMS: 30000, // Connection timeout
        socketTimeoutMS: 360000, // Socket timeout (for long-running operations)
        writeConcern: {          // Optimize write concern for bulk operations
            w: 1,                // Acknowledge writes after primary has written
            j: false             // Don't wait for journal commit
        }
    });

    return { aiClient, dbClient };
}

export function getClientsPasswordless(): { aiClient: AzureOpenAI | null; dbClient: MongoClient | null } {
    let aiClient: AzureOpenAI | null = null;
    let dbClient: MongoClient | null = null;

    // For Azure OpenAI with DefaultAzureCredential
    const apiVersion = process.env.AZURE_OPENAI_EMBEDDING_API_VERSION!;
    const endpoint = process.env.AZURE_OPENAI_EMBEDDING_ENDPOINT!;
    const deployment = process.env.AZURE_OPENAI_EMBEDDING_MODEL!;

    if (apiVersion && endpoint && deployment) {
        const credential = new DefaultAzureCredential();
        const scope = "https://cognitiveservices.azure.com/.default";
        const azureADTokenProvider = getBearerTokenProvider(credential, scope);
        aiClient = new AzureOpenAI({
            apiVersion,
            endpoint,
            deployment,
            azureADTokenProvider
        });
    }

    // For Cosmos DB with DefaultAzureCredential
    const clusterName = process.env.MONGO_CLUSTER_NAME!;

    if (clusterName) {
        const credential = new DefaultAzureCredential();

        dbClient = new MongoClient(
            `mongodb+srv://${clusterName}.global.mongocluster.cosmos.azure.com/`, {
            connectTimeoutMS: 30000,
            tls: true,
            retryWrites: true,
            authMechanism: 'MONGODB-OIDC',
            authMechanismProperties: {
                OIDC_CALLBACK: (params: OIDCCallbackParams) => AzureIdentityTokenCallback(params, credential),
                ALLOWED_HOSTS: ['*.azure.com']
            }
        }
        );
    }

    return { aiClient, dbClient };
}

export async function readFileReturnJson(filePath: string): Promise<JsonData[]> {

    console.log(`Reading JSON file from ${filePath}`);

    const fileAsString = await fs.readFile(filePath, "utf-8");
    return JSON.parse(fileAsString);
}
export async function writeFileJson(filePath: string, jsonData: JsonData): Promise<void> {
    const jsonString = JSON.stringify(jsonData, null, 2);
    await fs.writeFile(filePath, jsonString, "utf-8");

    console.log(`Wrote JSON file to ${filePath}`);
}
export async function insertData(config, collection, data) {
    console.log(`Processing in batches of ${config.batchSize}...`);
    const totalBatches = Math.ceil(data.length / config.batchSize);

    let inserted = 0;
    let updated = 0;
    let skipped = 0;
    let failed = 0;

    for (let i = 0; i < totalBatches; i++) {
        const start = i * config.batchSize;
        const end = Math.min(start + config.batchSize, data.length);
        const batch = data.slice(start, end);

        try {
            const result = await collection.insertMany(batch, { ordered: false });
            inserted += result.insertedCount || 0;
            console.log(`Batch ${i + 1} complete: ${result.insertedCount} inserted`);
        } catch (error: any) {
            if (error?.writeErrors) {
                // Some documents may have been inserted despite errors
                console.error(`Error in batch ${i + 1}: ${error?.writeErrors.length} failures`);
                failed += error?.writeErrors.length;
                inserted += batch.length - error?.writeErrors.length;
            } else {
                console.error(`Error in batch ${i + 1}:`, error);
                failed += batch.length;
            }
        }

        // Small pause between batches to reduce resource contention
        if (i < totalBatches - 1) {
            await new Promise(resolve => setTimeout(resolve, 100));
        }
    }
    const indexColumns = [
        "HotelId",
        "Category",
        "Description",
        "Description_fr"
    ];
    for (const col of indexColumns) {
        const indexSpec = {};
        indexSpec[col] = 1; // Ascending index
        await collection.createIndex(indexSpec);
    }

    return { total: data.length, inserted, updated, skipped, failed };
}

export function printSearchResults(insertSummary, indexSummary, searchResults) {


    if (!searchResults || searchResults.length === 0) {
        console.log('No search results found.');
        return;
    }

    searchResults.map((result, index) => {

        const { document, score } = result as any;

        console.log(`${index + 1}. HotelName: ${document.HotelName}, Score: ${score.toFixed(4)}`);
        //console.log(`   Description: ${document.Description}`);
    });

}

此实用工具模块提供以下功能:

  • JsonData:数据结构的接口
  • scoreProperty:基于矢量搜索方法评分在查询结果中的位置
  • getClients:为 Azure OpenAI 和 Azure Cosmos DB for MongoDB vCore 创建并返回客户端
  • getClientsPasswordless:使用无密码身份验证为 Azure OpenAI 和 Azure Cosmos DB for MongoDB vCore 创建并返回客户端。 在资源上启用 RBAC 并登录到 Azure CLI
  • readFileReturnJson:读取 JSON 文件并将其内容作为对象数组 JsonData 返回
  • writeFileJson:将对象数组 JsonData 写入 JSON 文件
  • insertData:将数据批量插入 MongoDB 集合,并在指定字段上创建标准索引
  • printSearchResults:打印矢量搜索的结果,包括分数和酒店名称

使用 Azure CLI 进行身份验证

在运行应用程序之前登录到 Azure CLI,以便它可以安全地访问 Azure 资源。

az login

生成并运行应用程序

生成 TypeScript 文件,然后运行应用程序:

npm run build
npm run start:diskann

应用日志记录和输出显示:

  • 集合创建和数据插入状态
  • 矢量索引创建
  • 具有酒店名称和相似性分数的搜索结果
Created collection: hotels_diskann
Reading JSON file from C:\Users\<username>\repos\samples\cosmos-db-vector-samples\data\HotelsData_toCosmosDB_Vector.json
Processing in batches of 100...
Batch 1 complete: 50 inserted
Created vector index: vectorIndex_diskann
1. HotelName: Roach Motel, Score: 0.8399
2. HotelName: Royal Cottage Resort, Score: 0.8385
3. HotelName: Economy Universe Motel, Score: 0.8360
4. HotelName: Foot Happy Suites, Score: 0.8354
5. HotelName: Country Comfort Inn, Score: 0.8346
Closing database connection...
Database connection closed

在 Visual Studio Code 中查看和管理数据

  1. 在 Visual Studio Code 中选择 DocumentDB 扩展 以连接到 Azure Cosmos DB 帐户。

  2. 查看 Hotels 数据库中的数据和索引。

    DocumentDB 扩展的屏幕截图,其中显示了 Cosmos DB Mongo (vCore) 集合。

清理资源

当不需要资源组、MongoDB vCore 帐户和 Azure OpenAI 资源时,请删除它们以避免额外费用。