Compartir a través de

使用 Ruby 在 Azure Cosmos DB for PostgreSQL 上连接和运行 SQL 命令

适用对象: Azure Cosmos DB for PostgreSQL(由 PostgreSQL 的 Citus 数据库扩展提供支持)

本快速入门演示如何使用 Ruby 代码连接到群集以及如何使用 SQL 语句创建表。 然后演示如何在数据库中插入、查询、更新和删除数据。 本文中的步骤假定你熟悉 Ruby 开发,但不熟悉 Azure Cosmos DB for PostgreSQL 的使用。

安装 PostgreSQL 库

本文中的代码示例需要 pg gem。 你需要使用语言包管理器(例如 bundler)安装 pg。

进行连接,创建表,然后插入数据

使用以下代码进行连接,使用 CREATE TABLE SQL 语句创建表,然后使用 INSERT INTO SQL 语句将行添加到表中。 该代码使用 PG::Connection 对象和构造函数来连接到 Azure Cosmos DB for PostgreSQL。 然后调用 exec() 方法,以便运行 DROP、CREATE TABLE 和 INSERT INTO 命令。 代码使用 PG::Error 类来检查是否存在错误。 然后,它会调用方法 close(),在终止之前关闭连接。

在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码或 Microsoft Entra ID 令牌。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database'

    # Drop previous table of same name if one exists
    connection.exec('DROP TABLE IF EXISTS pharmacy;')
    puts 'Finished dropping table (if existed).'

    # Drop previous table of same name if one exists.
    connection.exec('CREATE TABLE pharmacy (pharmacy_id integer ,pharmacy_name text,city text,state text,zip_code integer);')
    puts 'Finished creating table.'

    # Insert some data into table.
    connection.exec("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);")
    connection.exec("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);")
    puts 'Inserted 2 rows of data.'

    # Create index
    connection.exec("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);") 
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

分发表

Azure Cosmos DB for PostgreSQL 可为你提供跨多个节点分发表的强大功能,以实现可伸缩性。 可以使用以下命令来分配表。 可以在此处详细了解 create_distributed_table 和分布列。

注意

通过分发表,它们可在添加到群集的任何工作器节点之间增长。

使用以下代码连接到数据库并分发表。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    # Super power of distributed tables.
    connection.exec("select create_distributed_table('pharmacy','pharmacy_id');") 
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

读取数据

使用以下代码进行连接,并使用 SELECT SQL 语句来读取数据。

该代码调用 exec() 方法来运行 SELECT 命令,并将结果保存在结果集中。 结果集集合使用 resultSet.each do 循环进行迭代,将当前行值保存在行变量中。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    resultSet = connection.exec('SELECT * from pharmacy')
    resultSet.each do |row|
        puts 'Data row = (%s, %s, %s, %s, %s)' % [row['pharmacy_id'], row['pharmacy_name'], row['city'], row['state'], row['zip_code ']]
    end
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

更新数据

使用以下代码进行连接,并使用 UPDATE SQL 语句更新数据。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    # Modify some data in table.
    connection.exec('UPDATE pharmacy SET city = %s WHERE pharmacy_id = %d;' % ['\'guntur\'',100])
    puts 'Updated 1 row of data.'
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

删除数据

使用以下代码进行连接,并使用 DELETE SQL 语句删除数据。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    # Delete some data in table.
    connection.exec('DELETE FROM pharmacy WHERE city = %s;' % ['\'guntur\''])
    puts 'Deleted 1 row of data.'
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

用于超快速引入的 COPY 命令

在将数据引入 Azure Cosmos DB for PostgreSQL 时,COPY 命令可能会产生巨大的吞吐量。 COPY 命令可以引入文件中的数据,也可以使用内存中的微批数据进行实时引入。

用于从文件加载数据的 COPY 命令

以下代码将数据从 CSV 文件复制到数据库表。 它要求使用 pharmacies.csv 文件。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

require 'pg'
begin
    filename = String('pharmacies.csv')

    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    # Copy the data from Csv to table.
    result = connection.copy_data "COPY pharmacy FROM STDIN with csv" do
        File.open(filename , 'r').each do |line|
            connection.put_copy_data line
        end
    puts 'Copied csv data successfully.'
    end      
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

用于加载内存中数据的 COPY 命令

以下代码将内存中数据复制到表。 在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

require 'pg'
begin
    # NOTE: Replace <cluster> and <password> in the connection string.
    connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
    puts 'Successfully created connection to database.'

    enco = PG::TextEncoder::CopyRow.new
    connection.copy_data "COPY pharmacy FROM STDIN", enco do
        connection.put_copy_data [5000,'Target','Sunnyvale','California','94001']
        connection.put_copy_data [5001, 'CVS','San Francisco','California','94002']
        puts 'Copied in-memory data successfully.'
    end
rescue PG::Error => e
    puts e.message
ensure
    connection.close if connection
end

针对数据库请求失败情况的应用重试

有时,来自应用程序的数据库请求可能会失败。 此类问题可能在不同的场景下发生,例如应用和数据库之间的网络故障、密码错误等。有些问题可能是暂时的,并且在几秒到几分钟内自行解决。 可以在应用中配置重试逻辑以克服暂时性错误。

在应用中配置重试逻辑有助于改善最终用户体验。 在故障情况下,用户只会等待应用程序处理请求的时间稍长,而不会遇到错误。

下面的示例演示如何在应用中实现重试逻辑。 示例代码片段每 60 秒尝试一次数据库请求(最多 5 次),直到成功为止。 可以根据应用程序的需求配置重试次数和频率。

在代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

require 'pg'

def executeretry(sql,retryCount)
  begin
    for a in 1..retryCount do
      begin
        # NOTE: Replace <cluster> and <password> in the connection string.
        connection = PG::Connection.new("host=c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn port=5432 dbname=citus user=citus password=<password> sslmode=require")
        resultSet = connection.exec(sql)
        return resultSet.each
      rescue PG::Error => e
        puts e.message
        sleep 60
      ensure
        connection.close if connection
      end
    end
  end
  return nil
end

var = executeretry('select 1',5)

if var !=nil then
  var.each do |row|
    puts 'Data row = (%s)' % [row]
  end
end

后续步骤