Compartir a través de

使用 Node.js 在 Azure Cosmos DB for PostgreSQL 上连接和运行 SQL 命令

适用对象: Azure Cosmos DB for PostgreSQL(由 PostgreSQL 的 Citus 数据库扩展提供支持)

本快速入门演示如何使用 Node.js 代码连接到群集以及如何使用 SQL 语句创建表。 然后演示如何在数据库中插入、查询、更新和删除数据。 本文中的步骤假定你熟悉 Node.js 开发,但不熟悉如何使用 Azure Cosmos DB for PostgreSQL。

安装 PostgreSQL 库

本文中的代码示例需要 pg 库才能与 PostgreSQL 服务器进行交互。 你需要使用语言包管理器(例如 npm)安装 pg。

进行连接,创建表,然后插入数据

创建常见连接模块

提示

下面的示例代码使用连接池来创建和管理与 PostgreSQL 的连接。 强烈建议使用应用程序端连接池,因为:

  • 它可确保应用程序不会生成太多通向数据库的连接,从而避免超过连接限制。
  • 这有助于大幅提高性能,包括延迟和吞吐量。 PostgreSQL 服务器进程必须创建分支来处理每个新连接,而重用连接可避免这项开销。

创建名为 db 的文件夹,在此文件夹中创建包含以下常见连接代码的 citus.js 文件。 在此代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c.<cluster>.postgres.database.chinacloudapi.cn',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

创建表

通过以下代码进行连接,然后使用 CREATE TABLE 和 INSERT INTO SQL 语句加载数据。 该代码将创建新的 pharmacy 表并插入一些示例数据。

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

分发表

Azure Cosmos DB for PostgreSQL 可为你提供跨多个节点分发表的强大功能,以实现可伸缩性。 可以使用以下命令来分配表。 可以在此处详细了解 create_distributed_table 和分布列。

注意

通过分发表,它们可在添加到群集的任何工作器节点之间增长。

使用以下代码连接到数据库并分发表。

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

读取数据

使用以下代码进行连接,并使用 SELECT SQL 语句读取数据。

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

更新数据

使用以下代码进行连接,并使用 UPDATE SQL 语句更新数据。

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

删除数据

使用以下代码进行连接,并使用 DELETE SQL 语句删除数据。

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

用于快速引入的 COPY 命令

在将数据引入 Azure Cosmos DB for PostgreSQL 时,COPY 命令可能会产生巨大的吞吐量。 COPY 命令可以引入文件中的数据,也可以使用内存中的微批数据进行实时引入。

用于从文件加载数据的 COPY 命令

以下代码将数据从 CSV 文件复制到数据库表。 代码需要 pg-copy-streams 包和 pharmacies.csv 文件。

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

用于加载内存中数据的 COPY 命令

以下代码将内存中数据复制到表。 代码需要 through2 包,通过它可实现管道链接。

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

针对数据库请求失败情况的应用重试

有时,来自应用程序的数据库请求可能会失败。 此类问题可能在不同的场景下发生,例如应用和数据库之间的网络故障、密码错误等。有些问题可能是暂时的,并且在几秒到几分钟内自行解决。 可以在应用中配置重试逻辑以克服暂时性错误。

在应用中配置重试逻辑有助于改善最终用户体验。 在故障情况下,用户只会等待应用程序处理请求的时间稍长,而不会遇到错误。

下面的示例演示如何在应用中实现重试逻辑。 示例代码片段每 60 秒尝试一次数据库请求(最多 5 次),直到成功为止。 可以根据应用程序的需求配置重试次数和频率。

在此代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c.<cluster>.postgres.database.chinacloudapi.cn',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

后续步骤