用于 Node.js 的 Databricks SQL 驱动程序

Databricks SQL Driver for Node.js 是一个 Node.js 库,它让你可以使用 JavaScript 代码在 Azure Databricks 计算资源上运行 SQL 命令。

要求

  • 运行 Node.js 版本 14 或更高版本的开发计算机。 若要输出已安装的 Node.js 版本,请运行命令 node -v。 若要安装和使用其他版本的 Node.js,可以使用 Node 版本管理器 (nvm) 等工具。

  • Node 包管理器 (npm)。 较新版本的 Node.js 已包含 npm。 若要检查是否已安装 npm,请运行命令 npm -v。 若要根据需要安装 npm,可以按照下载并安装 npm 中的说明进行操作。

  • 来自 npm@databricks/sql 包。 若要将 @databricks/sql 包作为依赖项安装到 Node.js 项目中,请使用 npm 从与项目相同的目录中运行以下命令:

    npm i @databricks/sql
    
  • 如果希望将 TypeScript 安装到 Node.js 项目中并将其用作 devDependencies,请使用 npm 从与项目相同的目录中运行以下命令:

    npm i -D typescript
    npm i -D @types/node
    
  • 现有群集SQL 仓库

  • 现有群集或 SQL 仓库的“服务器主机名”和“HTTP 路径”值。

身份验证

用于 Node.js 的 Databricks SQL 驱动程序支持以下 Azure Databricks 身份验证类型:

Databricks SQL Driver for Node.js 尚不支持以下 Azure Databricks 身份验证类型:

注意

作为安全最佳做法,不应将连接变量值硬编码到代码中。 而应从安全位置检索这些连接变量值。 例如,本文中的代码片段和示例使用环境变量。

Databricks 个人访问令牌身份验证

要将用于 Node.js 的 Databricks SQL 驱动程序身份验证配合使用,你必须先创建一个 Azure Databricks 个人访问令牌。 有关此步骤的详细信息,请参阅适用于工作区用户的 Azure Databricks 个人访问令牌

要对用于 Node.js 的 Databricks SQL 驱动程序进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:

  • DATABRICKS_SERVER_HOSTNAME,设置为你的群集或 SQL 仓库的服务器主机名值。
  • DATABRICKS_HTTP_PATH,设置为你的群集或 SQL 仓库的 HTTP 路径值。
  • DATABRICKS_TOKEN,设置为 Azure Databricks 个人访问令牌。

若要设置环境变量,请参阅操作系统的文档。

JavaScript

const { DBSQLClient } = require('@databricks/sql');

const serverHostname = process.env.DATABRICKS_SERVER_HOSTNAME;
const httpPath       = process.env.DATABRICKS_HTTP_PATH;
const token          = process.env.DATABRICKS_TOKEN;

if (!token || !serverHostname || !httpPath) {
    throw new Error("Cannot find Server Hostname, HTTP Path, or " +
                    "personal access token. " +
                    "Check the environment variables DATABRICKS_SERVER_HOSTNAME, " +
                    "DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.");
  }

  const client = new DBSQLClient();
  const connectOptions = {
    token: token,
    host:  serverHostname,
    path:  httpPath
  };

  client.connect(connectOptions)
  // ...

TypeScript

import { DBSQLClient } from "@databricks/sql";

const serverHostname: string = process.env.DATABRICKS_SERVER_HOSTNAME || '';
const httpPath: string       = process.env.DATABRICKS_HTTP_PATH || '';
const token: string          = process.env.DATABRICKS_TOKEN || '';

if (token == '' || serverHostname == '' || httpPath == '') {
    throw new Error("Cannot find Server Hostname, HTTP Path, or personal access token. " +
                    "Check the environment variables DATABRICKS_SERVER_HOSTNAME, " +
                    "DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.");
  }

  const client: DBSQLClient = new DBSQLClient();
  const connectOptions = {
    token: token,
    host:  serverHostname,
    path:  httpPath
  };

  client.connect(connectOptions)
  // ...

OAuth 用户到计算机 (U2M) 身份验证

Databricks SQL Driver for Node.js 版本 1.8.0 及更高版本支持 OAuth 用户到计算机 (U2M) 身份验证

要使用 OAuth U2M 身份验证对 Databricks SQL Driver for Node.js 进行身份验证,请使用以下代码片段。 此代码片段假定你已设置以下环境变量:

  • DATABRICKS_SERVER_HOSTNAME,设置为你的群集或 SQL 仓库的服务器主机名值。
  • DATABRICKS_HTTP_PATH,设置为你的群集或 SQL 仓库的 HTTP 路径值。

若要设置环境变量,请参阅操作系统的文档。

JavaScript

const { DBSQLClient } = require('@databricks/sql');

const serverHostname = process.env.DATABRICKS_SERVER_HOSTNAME;
const httpPath       = process.env.DATABRICKS_HTTP_PATH;

if (!serverHostname || !httpPath) {
    throw new Error("Cannot find Server Hostname or HTTP Path. " +
                    "Check the environment variables DATABRICKS_SERVER_HOSTNAME " +
                    "and DATABRICKS_HTTP_PATH.");
  }

  const client = new DBSQLClient();
  const connectOptions = {
    authType:                  "databricks-oauth",
    useDatabricksOAuthInAzure: true,
    host:                      serverHostname,
    path:                      httpPath
  };

  client.connect(connectOptions)
  // ...

TypeScript

import { DBSQLClient } from "@databricks/sql";

const serverHostname: string = process.env.DATABRICKS_SERVER_HOSTNAME || '';
const httpPath: string       = process.env.DATABRICKS_HTTP_PATH || '';

if (serverHostname == '' || httpPath == '') {
    throw new Error("Cannot find Server Hostname or HTTP Path. " +
                    "Check the environment variables DATABRICKS_SERVER_HOSTNAME " +
                    "and DATABRICKS_HTTP_PATH.");
  }

  const client: DBSQLClient = new DBSQLClient();
  const connectOptions = {
    authType:                  "databricks-oauth",
    useDatabricksOAuthInAzure: true,
    host:                      serverHostname,
    path:                      httpPath
  };

  client.connect(connectOptions)
  // ...

OAuth 计算机到计算机 (M2M) 身份验证

Databricks SQL Driver for Node.js 版本 1.8.0 及更高版本支持 OAuth 计算机到计算机 (U2M) 身份验证

要将 Databricks SQL Driver for Node.js 与 OAuth M2M 身份验证配合使用,必须执行以下操作:

  1. 在 Azure Databricks 工作区中创建 Azure Databricks 服务主体,并为该服务主体创建 OAuth 机密。

    若要创建服务主体及其 OAuth 机密,请参阅使用 OAuth (OAuth M2M) 通过服务主体对 Azure Databricks 的访问进行身份验证。 记下服务主体的 UUID应用程序 ID 值,以及服务主体的 OAuth 机密的机密值。

  2. 授予服务主体对群集或仓库的访问权限。 请参阅计算权限管理 SQL 仓库

要对用于 Node.js 的 Databricks SQL 驱动程序进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:

  • DATABRICKS_SERVER_HOSTNAME,设置为你的群集或 SQL 仓库的服务器主机名值。
  • DATABRICKS_HTTP_PATH,设置为你的群集或 SQL 仓库的 HTTP 路径值。
  • DATABRICKS_CLIENT_ID,设置为服务主体的 UUID应用程序 ID 值。
  • DATABRICKS_CLIENT_SECRET,设置为服务主体的 OAuth 机密的“机密”值。

若要设置环境变量,请参阅操作系统的文档。

JavaScript

const { DBSQLClient } = require('@databricks/sql');

const serverHostname = process.env.DATABRICKS_SERVER_HOSTNAME;
const httpPath       = process.env.DATABRICKS_HTTP_PATH;
const clientId       = process.env.DATABRICKS_CLIENT_ID;
const clientSecret   = process.env.DATABRICKS_CLIENT_SECRET;

if (!serverHostname || !httpPath || !clientId || !clientSecret) {
    throw new Error("Cannot find Server Hostname, HTTP Path, or " +
                    "service principal ID or secret. " +
                    "Check the environment variables DATABRICKS_SERVER_HOSTNAME, " +
                    "DATABRICKS_HTTP_PATH, DATABRICKS_CLIENT_ID, and " +
                    "DATABRICKS_CLIENT_SECRET.");
  }

  const client = new DBSQLClient();
  const connectOptions = {
    authType:                  "databricks-oauth",
    useDatabricksOAuthInAzure: true,
    host:                      serverHostname,
    path:                      httpPath,
    oauthClientId:             clientId,
    oauthClientSecret:         clientSecret
  };

  client.connect(connectOptions)
  // ...

TypeScript

import { DBSQLClient } from "@databricks/sql";

const serverHostname: string = process.env.DATABRICKS_SERVER_HOSTNAME || '';
const httpPath: string       = process.env.DATABRICKS_HTTP_PATH || '';
const clientId: string       = process.env.DATABRICKS_CLIENT_ID || '';
const clientSecret: string   = process.env.DATABRICKS_CLIENT_SECRET || '';

if (serverHostname == '' || httpPath == '' || clientId == '' || clientSecret == '') {
    throw new Error("Cannot find Server Hostname, HTTP Path, or " +
                    "service principal ID or secret. " +
                    "Check the environment variables DATABRICKS_SERVER_HOSTNAME, " +
                    "DATABRICKS_HTTP_PATH, DATABRICKS_CLIENT_ID, and " +
                    "DATABRICKS_CLIENT_SECRET.");
  }

  const client: DBSQLClient = new DBSQLClient();
  const connectOptions = {
    authType:                  "databricks-oauth",
    useDatabricksOAuthInAzure: true,
    host:                      serverHostname,
    path:                      httpPath,
    oauthClientId:             clientId,
    oauthClientSecret:         clientSecret
  };

  client.connect(connectOptions)
  // ...

查询数据

下面的代码示例演示如何调用 Databricks SQL Driver for Node.js 以在 Azure Databricks 计算资源上运行基本 SQL 查询。 此命令返回 samples 目录的 nyctaxi 架构中的trips 表中的前两行。

注意

以下代码示例演示如何使用 Azure Databricks 个人访问令牌进行身份验证。 若要改用其他可用的 Azure Databricks 身份验证类型,请参阅身份验证

此代码示例从一组 Azure Databricks 环境变量中检索 tokenserver_hostnamehttp_path 连接变量值。 这些环境变量具有以下环境变量名称:

  • DATABRICKS_TOKEN,表示要求中的 Azure Databricks 个人访问令牌。
  • DATABRICKS_SERVER_HOSTNAME,表示要求中的“服务器主机名”值。
  • DATABRICKS_HTTP_PATH,表示要求中的“HTTP 路径”值。

可以使用其他方法来检索这些连接变量值。 使用环境变量只是众多方法中的一种。

以下代码示例演示如何调用适用于 Node.js 的 Databricks SQL 连接器在群集或 SQL 仓库上运行基本 SQL 命令。 此命令返回 trips 表中的前两行。

JavaScript

const { DBSQLClient } = require('@databricks/sql');

const token          = process.env.DATABRICKS_TOKEN;
const serverHostname = process.env.DATABRICKS_SERVER_HOSTNAME;
const httpPath       = process.env.DATABRICKS_HTTP_PATH;

if (!token || !serverHostname || !httpPath) {
  throw new Error("Cannot find Server Hostname, HTTP Path, or personal access token. " +
                  "Check the environment variables DATABRICKS_TOKEN, " +
                  "DATABRICKS_SERVER_HOSTNAME, and DATABRICKS_HTTP_PATH.");
}

const client = new DBSQLClient();
const connectOptions = {
  token: token,
  host: serverHostname,
  path: httpPath
};

client.connect(connectOptions)
  .then(async client => {
    const session = await client.openSession();
    const queryOperation = await session.executeStatement(
      'SELECT * FROM samples.nyctaxi.trips LIMIT 2',
      {
        runAsync: true,
        maxRows:  10000 // This option enables the direct results feature.
      }
    );

    const result = await queryOperation.fetchAll();

    await queryOperation.close();

    console.table(result);

    await session.close();
    await client.close();
})
.catch((error) => {
  console.error(error);
});

TypeScript

import { DBSQLClient } from '@databricks/sql';
import IDBSQLSession from '@databricks/sql/dist/contracts/IDBSQLSession';
import IOperation from '@databricks/sql/dist/contracts/IOperation';

const serverHostname: string = process.env.DATABRICKS_SERVER_HOSTNAME || '';
const httpPath: string       = process.env.DATABRICKS_HTTP_PATH || '';
const token: string          = process.env.DATABRICKS_TOKEN || '';

if (serverHostname == '' || httpPath == '' || token == '') {
  throw new Error("Cannot find Server Hostname, HTTP Path, or personal access token. " +
                  "Check the environment variables DATABRICKS_SERVER_HOSTNAME, " +
                  "DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.");
}

const client: DBSQLClient = new DBSQLClient();
const connectOptions = {
  host: serverHostname,
  path: httpPath,
  token: token
};

client.connect(connectOptions)
  .then(async client => {
    const session: IDBSQLSession = await client.openSession();

    const queryOperation: IOperation = await session.executeStatement(
      'SELECT * FROM samples.nyctaxi.trips LIMIT 2',
      {
        runAsync: true,
        maxRows: 10000 // This option enables the direct results feature.
      }
    );

    const result = await queryOperation.fetchAll();

    await queryOperation.close();

    console.table(result);

    await session.close();
    client.close();
  })
  .catch((error) => {
    console.error(error);
});

输出:

┌─────────┬─────┬────────┬───────────┬───────┬─────────┬────────┬───────┬───────┬────────┬────────┬────────┐
│ (index) │ _c0 │ carat  │    cut    │ color │ clarity │ depth  │ table │ price │   x    │   y    │   z    │
├─────────┼─────┼────────┼───────────┼───────┼─────────┼────────┼───────┼───────┼────────┼────────┼────────┤
│    0    │ '1' │ '0.23' │  'Ideal'  │  'E'  │  'SI2'  │ '61.5' │ '55'  │ '326' │ '3.95' │ '3.98' │ '2.43' │
│    1    │ '2' │ '0.21' │ 'Premium' │  'E'  │  'SI1'  │ '59.8' │ '61'  │ '326' │ '3.89' │ '3.84' │ '2.31' │
└─────────┴─────┴────────┴───────────┴───────┴─────────┴────────┴───────┴───────┴────────┴────────┴────────┘

会话

API 引用中返回 IOperation 对象的所有 IDBSQLSession 方法都具有影响其行为的以下常见参数:

  • runAsync 设置为 true 会启动异步模式。 IDBSQLSession 方法将操作放入队列并尽快返回。 返回 IOperation 对象的当前状态可能会有所不同,客户端负责在使用返回 IOperation 的之前检查其状态。 请参阅操作。 将 runAsync 设置为 false 表示 IDBSQLSession 方法等待操作完成。 Databricks 建议始终将 runAsync 设置为 true
  • maxRows 设置为非 null 值可启用直接结果。 对于直接结果,服务器会尝试等待操作完成,然后提取部分数据。 根据服务器在定义的时间内能够完成的工作量,IOperation 对象返回某种中间状态,而不是处于挂起状态。 通常,所有元数据和查询结果都是在向服务器的单个请求中返回的。 服务器使用 maxRows 来确定可以立即返回的记录数。 但是,实际区块的大小可能不同;请参阅 IDBSQLSession.fetchChunk。 默认会启用直接结果。 Databricks 建议不要禁用直接结果。

操作

会话中所述,API 引用中的 IDBSQLSession 会话方法返回的 IOperation 对象不会完全填充。 相关的服务器操作可能仍在进行中,例如等待 Databricks SQL 仓库启动、运行查询或提取数据。 IOperation 类向用户隐藏这些详细信息。 例如,fetchAllfetchChunkgetSchema 等方法在内部等待操作完成,然后返回结果。 可以使用 IOperation.finished() 方法显式等待操作完成。 这些方法采用在等待操作完成时定期调用的回调。 将 progress 选项设置为 true 会尝试从服务器请求额外的进度数据并将其传递给该回调。

可以随时调用 closecancel 方法。 调用时,它们会立即使 IOperation 对象失效;所有挂起的调用(如 fetchAllfetchChunkgetSchema)将立即取消,并返回错误。 在某些情况下,服务器操作可能已完成,并且 cancel 方法仅影响客户端。

fetchAll 方法在内部调用 fetchChunk 并将所有数据收集到数组中。 虽然这很方便,但当在大型数据集上使用时,它可能会导致内存不足错误。 fetchAll 选项通常传递给 fetchChunk

提取数据区块

提取数据区块会使用以下代码模式:

do {
  const chunk = await operation.fetchChunk();
  // Process the data chunk.
} while (await operation.hasMoreRows());

API 引用中的 fetchChunk 方法处理少量数据以减少内存消耗。 fetchChunk 首先等待操作完成(如果尚未完成),然后在等待周期期间调用回调,然后提取下一个数据区块。

可以使用 maxRows 选项指定所需的区块大小。 但是,返回的区块大小可能不同,更小,有时甚至更大。 fetchChunk 不会尝试在内部预提取数据,以便将其切入请求的部分。 它将选项发送到 maxRows 服务器,然后返回服务器返回的任何结果。 不要将这个 maxRows 选项与 IDBSQLSession 中的选项混淆。 传递给 fetchChunkmaxRows 定义每个区块的大小,不执行任何其他操作。

管理 Unity Catalog 卷中的文件

Databricks SQL Driver 可让你将本地文件写入 Unity Catalog 、从卷下载文件以及从卷中删除文件,如下例所示:

JavaScript

const { DBSQLClient } = require('@databricks/sql');

const serverHostname = process.env.DATABRICKS_SERVER_HOSTNAME;
const httpPath       = process.env.DATABRICKS_HTTP_PATH;
const token          = process.env.DATABRICKS_TOKEN;

if (!token || !serverHostname || !httpPath) {
    throw new Error("Cannot find Server Hostname, HTTP Path, or " +
                    "personal access token. " +
                    "Check the environment variables DATABRICKS_SERVER_HOSTNAME, " +
                    "DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.");
}

const client = new DBSQLClient();
const connectOptions = {
  token: token,
  host:  serverHostname,
  path:  httpPath
};

client.connect(connectOptions)
  .then(async client => {
    const session = await client.openSession();

    // Write a local file to a volume in the specified path.
    // For writing local files to volumes, you must first specify the path to the
    // local folder that contains the file to be written.
    // Specify OVERWRITE to overwrite any existing file in that path.
    await session.executeStatement(
      "PUT 'my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE", {
        stagingAllowedLocalPath: ["/tmp/"]
      }
    );

    // Download a file from a volume in the specified path.
    // For downloading files in volumes, you must first specify the path to the
    // local folder that will contain the downloaded file.
    await session.executeStatement(
      "GET '/Volumes/main/default/my-volume/my-data.csv' TO 'my-downloaded-data.csv'", {
        stagingAllowedLocalPath: ["/Users/paul.cornell/samples/nodejs-sql-driver/"]
      }
    )

      // Delete a file in a volume from the specified path.
      // For deleting files from volumes, you must add stagingAllowedLocalPath,
      // but its value will be ignored. As such, in this example, an empty string is
      // specified.
      await session.executeStatement(
        "REMOVE '/Volumes/main/default/my-volume/my-data.csv'", {
          stagingAllowedLocalPath: [""]
        }
      )

      await session.close();
      await client.close();
  })
  .catch((error) => {
    console.error(error);
  });

TypeScript

import { DBSQLClient } from '@databricks/sql';

const serverHostname: string | undefined = process.env.DATABRICKS_SERVER_HOSTNAME;
const httpPath: string | undefined = process.env.DATABRICKS_HTTP_PATH;
const token: string | undefined = process.env.DATABRICKS_TOKEN;

if (!token || !serverHostname || !httpPath) {
  throw new Error("Cannot find Server Hostname, HTTP Path, or " +
                  "personal access token. " +
                  "Check the environment variables DATABRICKS_SERVER_HOSTNAME, " +
                  "DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.");
}

const client: DBSQLClient = new DBSQLClient();
const connectOptions = {
  token: token,
  host: serverHostname,
  path: httpPath
};

client.connect(connectOptions)
  .then(async client => {
    const session = await client.openSession();

    // Write a local file to a volume in the specified path.
    // For writing local files to volumes, you must first specify the path to the
    // local folder that contains the file to be written.
    // Specify OVERWRITE to overwrite any existing file in that path.
    await session.executeStatement(
      "PUT 'my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE", {
        stagingAllowedLocalPath: ["/tmp/"]
      }
    );

    // Download a file from a volume in the specified path.
    // For downloading files in volumes, you must first specify the path to the
    // local folder that will contain the downloaded file.
    await session.executeStatement(
      "GET '/Volumes/main/default/my-volume/my-data.csv' TO 'my-downloaded-data.csv'", {
        stagingAllowedLocalPath: ["/Users/paul.cornell/samples/nodejs-sql-driver/"]
      }
    )

    // Delete a file in a volume from the specified path.
    // For deleting files from volumes, you must add stagingAllowedLocalPath,
    // but its value will be ignored. As such, in this example, an empty string is
    // specified.
    await session.executeStatement(
      "REMOVE '/Volumes/main/default/my-volume/my-data.csv'", {
        stagingAllowedLocalPath: [""]
      }
    )

    await session.close();
    await client.close();
  })
  .catch((error: any) => {
    console.error(error);
  });

配置日志记录

记录器提供有关调试连接器问题的信息。 所有 DBSQLClient 对象都使用打印到控制台的记录器进行实例化,但通过传入自定义记录器,可以将此信息发送到文件。 以下示例演示如何配置记录器并更改其级别。

JavaScript

const { DBSQLLogger, LogLevel } = require('@databricks/sql');
const logger = new DBSQLLogger({
  filepath: 'log.txt',
  level: LogLevel.info
});

// Set logger to different level.
logger.setLevel(LogLevel.debug);

TypeScript

import { DBSQLLogger, LogLevel } from '@databricks/sql';
const logger = new DBSQLLogger({
  filepath: 'log.txt',
  level: LogLevel.info,
});

// Set logger to different level.
logger.setLevel(LogLevel.debug);

有关其他示例,请参阅 GitHub 上的 databricks/databricks-sql-nodejs 存储库中的 examples 文件夹。

测试

若要测试代码,可使用 JavaScript 测试框架(如 Jest)。 要在模拟条件下测试代码,而无需调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态,可以使用 Jest 的内置模拟框架。

例如,给定以下名为 helpers.js 的文件,其包含使用 Azure Databricks 个人访问令牌返回到 Azure Databricks 工作区的连接的 getDBSQLClientWithPAT 函数、使用连接从指定表(例如,samples 目录 nyctaxi 架构中的 trips 表)中获取指定数目的数据行的 getAllColumnsFromTable 函数,以及打印数据行内容的 printResults 函数:

// helpers.js

const { DBSQLClient } = require('@databricks/sql');

async function getDBSQLClientWithPAT(token, serverHostname, httpPath) {
  const client = new DBSQLClient();
  const connectOptions = {
    token: token,
    host: serverHostname,
    path: httpPath
  };
  try {
    return await client.connect(connectOptions);
  } catch (error) {
    console.error(error);
    throw error;
  }
}

async function getAllColumnsFromTable(client, tableSpec, rowCount) {
  let session;
  let queryOperation;
  try {
    session = await client.openSession();
    queryOperation = await session.executeStatement(
      `SELECT * FROM ${tableSpec} LIMIT ${rowCount}`,
      {
        runAsync: true,
        maxRows: 10000 // This option enables the direct results feature.
      }
    );
  } catch (error) {
    console.error(error);
    throw error;
  }
  let result;
  try {
    result = await queryOperation.fetchAll();
  } catch (error) {
    console.error(error);
    throw error;
  } finally {
    if (queryOperation) {
      await queryOperation.close();
    }
    if (session) {
      await session.close();
    }
  }
  return result;
}

function printResult(result) {
  console.table(result);
}

module.exports = {
  getDBSQLClientWithPAT,
  getAllColumnsFromTable,
  printResult
};

另外,假设以下名为 main.js 的文件调用 getDBSQLClientWithPATgetAllColumnsFromTableprintResults 函数:

// main.js

const { getDBSQLClientWithPAT, getAllColumnsFromTable, printResult } = require('./helpers');

const token          = process.env.DATABRICKS_TOKEN;
const serverHostname = process.env.DATABRICKS_SERVER_HOSTNAME;
const httpPath       = process.env.DATABRICKS_HTTP_PATH;
const tableSpec      = process.env.DATABRICKS_TABLE_SPEC;

if (!token || !serverHostname || !httpPath) {
  throw new Error("Cannot find Server Hostname, HTTP Path, or personal access token. " +
    "Check the environment variables DATABRICKS_TOKEN, " +
    "DATABRICKS_SERVER_HOSTNAME, and DATABRICKS_HTTP_PATH.");
}

if (!tableSpec) {
  throw new Error("Cannot find table spec in the format catalog.schema.table. " +
    "Check the environment variable DATABRICKS_TABLE_SPEC."
  )
}

getDBSQLClientWithPAT(token, serverHostname, httpPath)
  .then(async client => {
    const result = await getAllColumnsFromTable(client, tableSpec, 2);
    printResult(result);
    await client.close();
  })
  .catch((error) => {
    console.error(error);
  });

以下名为 helpers.test.js 的文件测试 getAllColumnsFromTable 函数是否返回预期的响应。 此测试将模拟 DBSQLClient 对象,而不是创建与目标工作区的真实连接。 该测试还模拟一些符合真实数据中的架构和值的数据。 测试通过模拟连接返回模拟数据,然后检查其中一个模拟数据行的值是否与预期值匹配。

// helpers.test.js

const { getDBSQLClientWithPAT, getAllColumnsFromTable, printResult} = require('./helpers')

jest.mock('@databricks/sql', () => {
  return {
    DBSQLClient: jest.fn().mockImplementation(() => {
      return {
        connect: jest.fn().mockResolvedValue({ mock: 'DBSQLClient'})
      };
    }),
  };
});

test('getDBSQLClientWithPAT returns mocked Promise<DBSQLClient> object', async() => {
  const result = await getDBSQLClientWithPAT(
    token = 'my-token',
    serverHostname = 'mock-server-hostname',
    httpPath = 'mock-http-path'
  );

  expect(result).toEqual({ mock: 'DBSQLClient' });
});

const data = [
  {
    tpep_pickup_datetime: new Date(2016, 1, 13, 15, 51, 12),
    tpep_dropoff_datetime: new Date(2016, 1, 13, 16, 15, 3),
    trip_distance: 4.94,
    fare_amount: 19.0,
    pickup_zip: 10282,
    dropoff_zip: 10171
  },
  {
    tpep_pickup_datetime: new Date(2016, 1, 3, 17, 43, 18),
    tpep_dropoff_datetime: new Date(2016, 1, 3, 17, 45),
    trip_distance: 0.28,
    fare_amount: 3.5,
    pickup_zip: 10110,
    dropoff_zip: 10110
  }
];

const mockDBSQLClientForSession = {
  openSession: jest.fn().mockResolvedValue({
    executeStatement: jest.fn().mockResolvedValue({
      fetchAll: jest.fn().mockResolvedValue(data),
      close: jest.fn().mockResolvedValue(null)
    }),
    close: jest.fn().mockResolvedValue(null)
  })
};

test('getAllColumnsFromTable returns the correct fare_amount for the second mocked data row', async () => {
  const result = await getAllColumnsFromTable(
    client    = mockDBSQLClientForSession,
    tableSpec = 'mock-table-spec',
    rowCount  = 2);
  expect(result[1].fare_amount).toEqual(3.5);
});

global.console.table = jest.fn();

test('printResult mock prints the correct fare_amount for the second mocked data row', () => {
  printResult(data);
  expect(console.table).toHaveBeenCalledWith(data);
  expect(data[1].fare_amount).toBe(3.5);
});

对于 TypeScript,上述代码看起来类似。 对于使用 TypeScript 的 Jest 测试,请使用 ts-jest

其他资源

API 参考

DBSQLClient

用于与数据库交互的主入口点。

方法
connect 方法

打开与数据库的连接。

参数
options

类型:ConnectionOptions

这组选项用于连接到数据库。

必须填充 hostpath 和其他必填字段。 请参阅身份验证

示例:


const client: DBSQLClient = new DBSQLClient();

client.connect(
{
host: serverHostname,
path: httpPath,
// ...
}
)

返回:Promise<IDBSQLClient>

openSession 方法

打开 DBSQLClient 与数据库之间的对话。

参数
请求

类型:OpenSessionRequest

用于指定初始架构和初始目录的一组可选参数

示例:


const session = await client.openSession(
{initialCatalog: 'catalog'}
);

返回:Promise<IDBSQLSession>

getClient 方法

返回内部 thrift TCLIService.Client 对象。 必须在 DBSQLClient 连接后调用。

无参数

返回 TCLIService.Client

close 方法

关闭与数据库的连接,并释放服务器上所有关联的资源。 对此客户端的任何其他调用都将引发错误。

无参数。

没有返回值。

DBSQLSession

DBSQLSession 主要用于执行针对数据库的语句以及各种元数据提取操作。

方法
executeStatement 方法

使用提供的选项执行语句。

参数
语句

类型:str

要执行的语句。
options

类型:ExecuteStatementOptions

一组可选参数,用于确定查询超时、直接结果的最大行数以及是否异步运行查询。 默认情况下,maxRows 设置为 10000。 如果 maxRows 设置为 null,操作将在直接结果功能关闭的情况下运行。

示例:


const session = await client.openSession(
{initialCatalog: 'catalog'}
);

queryOperation = await session.executeStatement(
'SELECT "Hello, World!"', { runAsync: true }
);

返回:Promise<IOperation>

close 方法

关闭会话。 必须在使用对话后完成。

无参数。

没有返回值。

getId 方法

返回对话的 GUID。

无参数。

返回:str

getTypeInfo 方法

返回关于支持数据类型的信息。

参数
请求

类型:TypeInfoRequest

请求参数。

返回:Promise<IOperation>

getCatalogs 方法

获取目录列表。

参数
请求

类型:CatalogsRequest

请求参数。

返回:Promise<IOperation>

getSchemas 方法

获取架构列表。

参数
请求

类型:SchemasRequest

请求参数。 字段 catalogNameschemaName 可以用于筛选目的。

返回:Promise<IOperation>

getTables 方法

获取表列表。

参数
请求

类型:TablesRequest

请求参数。 字段 catalogNameschemaName
tableName 可用于筛选。

返回:Promise<IOperation>

getFunctions 方法

获取表列表。

参数
请求

类型:FunctionsRequest

请求参数。 字段 functionName 为必填。

返回:Promise<IOperation>

getPrimaryKeys 方法

获取主键的列表。

参数
请求

类型:PrimaryKeysRequest

请求参数。 字段 schemaNametableName 都是必填字段。

返回:Promise<IOperation>

getCrossReference 方法

获取有关两个表之间的外键的信息。

参数
请求

类型:CrossReferenceRequest

请求参数。 必须为这两个表指定架构、父级和目录名称。

返回:Promise<IOperation>

DBSQLOperation

DBSQLOperations 由 DBSQLSessions 创建,可用于提取语句的结果并检查其执行情况。 数据通过函数 fetchChunk 和 fetchAll 提取。

方法
getId 方法

返回操作的 GUID。

无参数。

返回:str

fetchAll 方法

等待操作完成,然后从操作中提取所有行。

参数:无

返回:Promise<Array<object>>

fetchChunk 方法

等待操作完成,然后从操作中提取最多指定数量的行。

参数
options

类型:FetchOptions

用于提取的选项。 目前,唯一的选项是 maxRows,它对应于要在任何给定数组中返回的最大数据对象数。

返回:Promise<Array<object>>

close 方法

关闭操作并释放所有关联的资源。 必须在不再使用操作后完成。

无参数。

没有返回值。