Use Python to connect and run SQL commands on Azure Cosmos DB for PostgreSQL
APPLIES TO: Azure Cosmos DB for PostgreSQL (powered by the Citus database extension to PostgreSQL)
This quickstart shows you how to use Python code to connect to a cluster, and use SQL statements to create a table. You'll then insert, query, update, and delete data in the database. The steps in this article assume that you're familiar with Python development, and are new to working with Azure Cosmos DB for PostgreSQL.
Install PostgreSQL library
The code examples in this article require the psycopg2 library. You'll need to install psycopg2 with your language package manager (such as pip).
Connect, create a table, and insert data
The following code example creates a connection pool to your Postgres database. It then uses cursor.execute functions with SQL CREATE TABLE and INSERT INTO statements to create a table and insert data.
Tip
The sample code below uses a connection pool to create and manage connections to PostgreSQL. Application-side connection pooling is strongly recommended because:
- It ensures that the application doesn't generate too many connections to the database, and so avoids exceeding connection limits.
- It can help drastically improve performance--both latency and throughput. The PostgreSQL server process must fork to handle each new connection, and reusing a connection avoids that overhead.
In the following code, replace <cluster> with your cluster name and <password> with your administrator password.
Note
This example closes the connection at the end, so if you want to run the other samples in the article in the same session, don't include the # Clean up
section when you run this sample.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c.<cluster>.postgres.database.chinacloudapi.cn"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
When the code runs successfully, it produces the following output:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Distribute tables
Azure Cosmos DB for PostgreSQL gives you the super power of distributing tables across multiple nodes for scalability. The command below enables you to distribute a table. You can learn more about create_distributed_table
and the distribution column here.
Note
Distributing tables lets them grow across any worker nodes added to the cluster.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Read data
The following code example uses the following APIs to read data from the database:
- cursor.execute with the SQL SELECT statement to read data.
- cursor.fetchall() to accept a query and return a result set to iterate.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Update data
The following code example uses cursor.execute
with the SQL UPDATE statement to update data.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Delete data
The following code example runs cursor.execute
with the SQL DELETE statement to delete the data.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
COPY command for fast ingestion
The COPY command can yield tremendous throughput while ingesting data into Azure Cosmos DB for PostgreSQL. The COPY command can ingest data in files, or from micro-batches of data in memory for real-time ingestion.
COPY command to load data from a file
The following code copies data from a CSV file to a database table. The code requires the file pharmacies.csv.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
COPY command to load in-memory data
The following code copies in-memory data to a table.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
App retry for database request failures
It's sometimes possible that database requests from your application fail. Such issues can happen under different scenarios, such as network failure between app and database, incorrect password, etc. Some issues may be transient, and resolve themselves in a few seconds to minutes. You can configure retry logic in your app to overcome the transient errors.
Configuring retry logic in your app helps improve the end user experience. Under failure scenarios, users will merely wait a bit longer for the application to serve requests, rather than experience errors.
The example below shows how to implement retry logic in your app. The sample code snippet tries a database request every 60 seconds (up to five times) until it succeeds. The number and frequency of retries can be configured based on your application's needs.
In this code, replace <cluster> with your cluster name and <password> with your administrator password.
import psycopg2
import time
from psycopg2 import pool
host = "c.<cluster>.postgres.database.chinacloudapi.cn"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Next steps
- See how the Azure Cosmos DB for PostgreSQL API extends PostgreSQL, and try useful diagnostic queries
- Pick the best cluster size for your workload
- Monitor cluster performance