Compartir a través de

在 Azure Cosmos DB for PostgreSQL 上连接和运行 SQL 命令的 Java 应用

适用对象: Azure Cosmos DB for PostgreSQL(由 PostgreSQL 的 Citus 数据库扩展提供支持)

本快速入门演示如何使用 Java 代码连接到群集以及如何使用 SQL 语句创建表。 然后演示如何在数据库中插入、查询、更新和删除数据。 本文中的步骤假定你熟悉 Java 开发和 JDBC,但不熟悉 Azure Cosmos DB for PostgreSQL 的使用。

设置 Java 项目和连接

创建新的 Java 项目和配置文件以连接到 Azure Cosmos DB for PostgreSQL。

新建一个 Java 项目

通过你喜欢的集成开发环境 (IDE),使用 groupId test 和 artifactId crud 创建新的 Java 项目。 在项目的根目录中,添加包含以下内容的 pom.xml 文件。 此文件可将 Apache Maven 配置为使用 Java 8 和适用于 Java 的最新 PostgreSQL 驱动程序。

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>test</groupId>
  <artifactId>crud</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>crud</name>
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-engine</artifactId>
      <version>5.7.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.2.12</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.zaxxer/HikariCP -->
    <dependency>
      <groupId>com.zaxxer</groupId>
      <artifactId>HikariCP</artifactId>
      <version>5.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-params</artifactId>
      <version>5.7.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>3.0.0-M5</version>
      </plugin>
    </plugins>
  </build>
</project>

配置数据库连接

在 src/main/resources/ 中,创建包含以下内容的 application.properties 文件。 将 <cluster> 替换为群集名称,将 <password> 替换为管理密码或 Microsoft Entra ID 令牌。

driver.class.name=org.postgresql.Driver
db.url=jdbc:postgresql://c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn:5432/citus?ssl=true&sslmode=require
db.username=citus
db.password=<password>

db.url 属性中的 ?ssl=true&sslmode=require 字符串指示 JDBC 驱动程序在连接到数据库时使用传输层安全性 (TLS)。 必须结合使用 TLS 与 Azure Cosmos DB for PostgreSQL,这是一个很好的安全做法。

创建表

配置具有分布式表的数据库架构。 连接到数据库以创建架构和表。

生成数据库架构

在 src/main/resources/ 中,创建包含以下内容的 schema.sql 文件:

DROP TABLE IF EXISTS public.pharmacy;
CREATE TABLE  public.pharmacy(pharmacy_id integer,pharmacy_name text ,city text ,state text ,zip_code integer);
CREATE INDEX idx_pharmacy_id ON public.pharmacy(pharmacy_id);

分发表

Azure Cosmos DB for PostgreSQL 可为你提供跨多个节点分发表的强大功能,以实现可伸缩性。 可以使用以下命令来分配表。 可以在此处详细了解 create_distributed_table 和分布列。

注意

通过分发表,它们可在添加到群集的任何工作器节点之间增长。

若要分发表,请将以下行追加到上一部分中创建的 schema.sql 文件。

select create_distributed_table('public.pharmacy','pharmacy_id');

连接到数据库并创建架构

接下来添加 Java 代码,以便使用 JDBC 在群集中存储并检索数据。 该代码使用 application.properties 和 schema.sql 文件连接到群集并创建架构。

  1. 创建包含以下代码的 DButil.java 文件,其中包含 DButil 类。 DBUtil 类使用 HikariCP 将连接池设置为 PostgreSQL。 使用此类连接到 PostgreSQL 并开始查询。

    提示

    下面的示例代码使用连接池来创建和管理与 PostgreSQL 的连接。 强烈建议使用应用程序端连接池,因为:

    • 它可确保应用程序不会生成太多通向数据库的连接,从而避免超过连接限制。
    • 这有助于大幅提高性能,包括延迟和吞吐量。 PostgreSQL 服务器进程必须创建分支来处理每个新连接,而重用连接可避免这项开销。
    //DButil.java
    package test.crud;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.sql.SQLException;
    import java.util.Properties;
    
    import javax.sql.DataSource;
    
    import com.zaxxer.hikari.HikariDataSource;
    
    public class DButil {
        private static final String DB_USERNAME = "db.username";
        private static final String DB_PASSWORD = "db.password";
        private static final String DB_URL = "db.url";
        private static final String DB_DRIVER_CLASS = "driver.class.name";
        private static Properties properties =  null;
        private static HikariDataSource datasource;
    
        static {
            try {
                properties = new Properties();
                properties.load(new FileInputStream("src/main/java/application.properties"));
    
                datasource = new HikariDataSource();
                datasource.setDriverClassName(properties.getProperty(DB_DRIVER_CLASS ));
                datasource.setJdbcUrl(properties.getProperty(DB_URL));
                datasource.setUsername(properties.getProperty(DB_USERNAME));
                datasource.setPassword(properties.getProperty(DB_PASSWORD));
                datasource.setMinimumIdle(100);
                datasource.setMaximumPoolSize(1000000000);
                datasource.setAutoCommit(true);
                datasource.setLoginTimeout(3);
            } catch (IOException | SQLException  e) {
                e.printStackTrace();
            }
        }
        public static DataSource getDataSource() {
            return datasource;
        }
    }
    
  2. 在 src/main/java/ 中,创建包含以下代码的 DemoApplication.java 文件:

    package test.crud;
    import java.io.IOException;
    import java.sql.*;
    import java.util.*;
    import java.util.logging.Logger;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import org.postgresql.copy.CopyManager;
    import org.postgresql.core.BaseConnection;
    import java.io.IOException;
    import java.io.Reader;
    import java.io.StringReader;
    
    public class DemoApplication {
    
        private static final Logger log;
    
        static {
            System.setProperty("java.util.logging.SimpleFormatter.format", "[%4$-7s] %5$s %n");
            log =Logger.getLogger(DemoApplication.class.getName());
        }
        public static void main(String[] args)throws Exception
        {
            log.info("Connecting to the database");
            Connection connection = DButil.getDataSource().getConnection();
            System.out.println("The Connection Object is of Class: " + connection.getClass());
            log.info("Database connection test: " + connection.getCatalog());
            log.info("Creating table");
            log.info("Creating index");
            log.info("distributing table");
            Scanner scanner = new Scanner(DemoApplication.class.getClassLoader().getResourceAsStream("schema.sql"));
            Statement statement = connection.createStatement();
            while (scanner.hasNextLine()) {
                statement.execute(scanner.nextLine());
            }
            log.info("Closing database connection");
            connection.close();
        }
    
    }
    

    注意

    执行 DriverManager.getConnection(properties.getProperty("url"), properties); 时使用数据库 userpassword 凭据。 凭据存储在 application.properties 文件中,该文件作为参数传递。

  3. 现在可以通过喜欢的工具执行此主类:

    • 使用你的 IDE,你应该能够右键单击 DemoApplication 类并执行它。
    • 使用 Maven,可以通过执行以下操作来运行应用程序:
      mvn exec:java -Dexec.mainClass="com.example.demo.DemoApplication"

应用程序应连接到 Azure Cosmos DB for PostgreSQL,创建数据库架构,然后关闭连接,如控制台日志中所示:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Create database schema
[INFO   ] Closing database connection

创建域类

DemoApplication 类旁创建新的 Pharmacy Java 类并添加以下代码:

public class Pharmacy {
    private Integer pharmacy_id;
    private String pharmacy_name;
    private String city;
    private String state;
    private Integer zip_code;
    public Pharmacy() { }
    public Pharmacy(Integer pharmacy_id, String pharmacy_name, String city,String state,Integer zip_code)
    {
        this.pharmacy_id = pharmacy_id;
        this.pharmacy_name = pharmacy_name;
        this.city = city;
        this.state = state;
        this.zip_code = zip_code;
    }

    public Integer getpharmacy_id() {
        return pharmacy_id;
    }

    public void setpharmacy_id(Integer pharmacy_id) {
        this.pharmacy_id = pharmacy_id;
    }

    public String getpharmacy_name() {
        return pharmacy_name;
    }

    public void setpharmacy_name(String pharmacy_name) {
        this.pharmacy_name = pharmacy_name;
    }

    public String getcity() {
        return city;
    }

    public void setcity(String city) {
        this.city = city;
    }

    public String getstate() {
        return state;
    }

    public void setstate(String state) {
        this.state = state;
    }

    public Integer getzip_code() {
        return zip_code;
    }

    public void setzip_code(Integer zip_code) {
        this.zip_code = zip_code;
    }
    @Override
    public String toString() {
        return "TPharmacy{" +
               "pharmacy_id=" + pharmacy_id +
               ", pharmacy_name='" + pharmacy_name + '\'' +
               ", city='" + city + '\'' +
               ", state='" + state + '\'' +
               ", zip_code='" + zip_code + '\'' +
               '}';
    }
}

此类是在执行 schema.sql 脚本时创建的 Pharmacy 表上映射的域模型。

插入数据

在 DemoApplication.java 文件中,在 main 方法后面添加以下使用 INSERT INTO SQL 语句将数据插入数据库的方法:

private static void insertData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Insert data");
    PreparedStatement insertStatement = connection
        .prepareStatement("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code)  VALUES (?, ?, ?, ?, ?);");

    insertStatement.setInt(1, todo.getpharmacy_id());
    insertStatement.setString(2, todo.getpharmacy_name());
    insertStatement.setString(3, todo.getcity());
    insertStatement.setString(4, todo.getstate());
    insertStatement.setInt(5, todo.getzip_code());

    insertStatement.executeUpdate();
}

在 main 方法中添加以下两行:

Pharmacy todo = new Pharmacy(0,"Target","Sunnyvale","California",94001);
insertData(todo, connection);

现在,执行主类应会生成以下输出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Closing database connection

读取数据

阅读前面插入的数据,验证代码是否正常工作。

在 DemoApplication.java 文件中,在 insertData 方法后面添加以下使用 SELECT SQL 语句从数据库读取数据的方法:

private static Pharmacy readData(Connection connection) throws SQLException {
    log.info("Read data");
    PreparedStatement readStatement = connection.prepareStatement("SELECT * FROM Pharmacy;");
    ResultSet resultSet = readStatement.executeQuery();
    if (!resultSet.next()) {
        log.info("There is no data in the database!");
        return null;
    }
    Pharmacy todo = new Pharmacy();
    todo.setpharmacy_id(resultSet.getInt("pharmacy_id"));
    todo.setpharmacy_name(resultSet.getString("pharmacy_name"));
    todo.setcity(resultSet.getString("city"));
    todo.setstate(resultSet.getString("state"));
    todo.setzip_code(resultSet.getInt("zip_code"));
    log.info("Data read from the database: " + todo.toString());
    return todo;
}

在 main 方法中添加以下行:

todo = readData(connection);

现在,执行主类应会生成以下输出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Closing database connection

更新数据

更新之前插入的数据。

仍然是在 DemoApplication.java 文件中,在 readData 方法之后添加以下方法,以使用 UPDATE SQL 语句更新数据库中的数据:

private static void updateData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Update data");
    PreparedStatement updateStatement = connection
        .prepareStatement("UPDATE pharmacy SET city = ? WHERE pharmacy_id = ?;");

    updateStatement.setString(1, todo.getcity());

    updateStatement.setInt(2, todo.getpharmacy_id());
    updateStatement.executeUpdate();
    readData(connection);
}

在 main 方法中添加以下两行:

todo.setcity("Guntur");
updateData(todo, connection);

现在,执行主类应会生成以下输出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Closing database connection

删除数据

最后,删除之前插入的数据。 仍然是在 DemoApplication.java 文件中,在 updateData 方法之后添加以下方法,以使用 DELETE SQL 语句删除数据库中的数据:

private static void deleteData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Delete data");
    PreparedStatement deleteStatement = connection.prepareStatement("DELETE FROM pharmacy WHERE pharmacy_id = ?;");
    deleteStatement.setLong(1, todo.getpharmacy_id());
    deleteStatement.executeUpdate();
    readData(connection);
}

现在可以在 main 方法中添加以下行:

deleteData(todo, connection);

现在,执行主类应会生成以下输出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
[INFO   ] Closing database connection

用于快速引入的 COPY 命令

在将数据引入 Azure Cosmos DB for PostgreSQL 时,COPY 命令可能会产生巨大的吞吐量。 COPY 命令可以引入文件中的数据,也可以使用内存中的微批数据进行实时引入。

用于从文件加载数据的 COPY 命令

以下代码将数据从 CSV 文件复制到数据库表。 该代码示例需要使用 pharmacies.csv 文件。

public static long
copyFromFile(Connection connection, String filePath, String tableName)
throws SQLException, IOException {
    long count = 0;
    FileInputStream fileInputStream = null;

    try {
        Connection unwrap = connection.unwrap(Connection.class);
        BaseConnection  connSec = (BaseConnection) unwrap;

        CopyManager copyManager = new CopyManager((BaseConnection) connSec);
        fileInputStream = new FileInputStream(filePath);
        count = copyManager.copyIn("COPY " + tableName + " FROM STDIN delimiter ',' csv", fileInputStream);
    } finally {
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    return count;
}

现在可以在 main 方法中添加以下行:

int c = (int) copyFromFile(connection,"C:\\Users\\pharmacies.csv", "pharmacy");
log.info("Copied "+ c +" rows using COPY command");

现在,执行 main 类应会生成以下输出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
[INFO ] Copied 5000 rows using COPY command
[INFO   ] Closing database connection

用于加载内存中数据的 COPY 命令

以下代码将内存中数据复制到表。

private static void inMemory(Connection connection) throws SQLException,IOException
    {
    log.info("Copying inmemory data into table");
            
    final List<String> rows = new ArrayList<>();
    rows.add("0,Target,Sunnyvale,California,94001");
    rows.add("1,Apollo,Guntur,Andhra,94003");
        
    final BaseConnection baseConnection = (BaseConnection) connection.unwrap(Connection.class);
    final CopyManager copyManager = new CopyManager(baseConnection);

    // COPY command can change based on the format of rows. This COPY command is for above rows.
    final String copyCommand = "COPY pharmacy FROM STDIN with csv";        
       
    try (final Reader reader = new StringReader(String.join("\n", rows))) {
        copyManager.copyIn(copyCommand, reader);
    }
}

现在可以在 main 方法中添加以下行:

inMemory(connection);

现在,执行主类应会生成以下输出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
5000
[INFO   ] Copying in-memory data into table
[INFO   ] Closing database connection

针对数据库请求失败情况的应用重试

有时,来自应用程序的数据库请求可能会失败。 此类问题可能在不同的场景下发生,例如应用和数据库之间的网络故障、密码错误等。有些问题可能是暂时的,并且在几秒到几分钟内自行解决。 可以在应用中配置重试逻辑以克服暂时性错误。

在应用中配置重试逻辑有助于改善最终用户体验。 在故障情况下,用户只会等待应用程序处理请求的时间稍长,而不会遇到错误。

下面的示例演示如何在应用中实现重试逻辑。 示例代码片段每 60 秒尝试一次数据库请求(最多 5 次),直到成功为止。 可以根据应用程序的需求配置重试次数和频率。

在此代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

package test.crud;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.logging.Logger;
import com.zaxxer.hikari.HikariDataSource;

public class DemoApplication
{
    private static final Logger log;

    static
    {
        System.setProperty("java.util.logging.SimpleFormatter.format", "[%4$-7s] %5$s %n");
        log = Logger.getLogger(DemoApplication.class.getName());
    }
    private static final String DB_USERNAME = "citus";
    private static final String DB_PASSWORD = "<password>";
    private static final String DB_URL = "jdbc:postgresql://c-<cluster>.<uniqueID>.postgres.cosmos.chinacloudapi.cn:5432/citus?sslmode=require";
    private static final String DB_DRIVER_CLASS = "org.postgresql.Driver";
    private static HikariDataSource datasource;

    private static String executeRetry(String sql, int retryCount) throws InterruptedException
    {
        Connection con = null;
        PreparedStatement pst = null;
        ResultSet rs = null;
        for (int i = 1; i <= retryCount; i++)
        {
            try
            {
                datasource = new HikariDataSource();
                datasource.setDriverClassName(DB_DRIVER_CLASS);
                datasource.setJdbcUrl(DB_URL);
                datasource.setUsername(DB_USERNAME);
                datasource.setPassword(DB_PASSWORD);
                datasource.setMinimumIdle(10);
                datasource.setMaximumPoolSize(1000);
                datasource.setAutoCommit(true);
                datasource.setLoginTimeout(3);
                log.info("Connecting to the database");
                con = datasource.getConnection();
                log.info("Connection established");
                log.info("Read data");
                pst = con.prepareStatement(sql);
                rs = pst.executeQuery();
                StringBuilder builder = new StringBuilder();
                int columnCount = rs.getMetaData().getColumnCount();
                while (rs.next())
                {
                    for (int j = 0; j < columnCount;)
                    {
                        builder.append(rs.getString(j + 1));
                        if (++j < columnCount)
                            builder.append(",");
                    }
                    builder.append("\r\n");
                }
                return builder.toString();
            }
            catch (Exception e)
            {
                Thread.sleep(60000);
                System.out.println(e.getMessage());
            }
        }
        return null;
    }

    public static void main(String[] args) throws Exception
    {
        String result = executeRetry("select 1", 5);
        System.out.print(result);
    }
}

后续步骤