PostgreSQL (often called “Postgres”) is one of the most powerful open-source relational database systems available, and Python stands as a versatile programming language for backend development. When combined with Psycopg2, the most popular PostgreSQL adapter for Python, you get a robust foundation for building reliable and scalable backend applications.
This guide will walk you through setting up, connecting, and leveraging PostgreSQL with Python using Psycopg2, complete with best practices and real-world patterns.
Setting Up the Environment
Installing Prerequisites
Before diving into code, make sure you have the necessary components installed:
# Install PostgreSQL client and server (Ubuntu/Debian)
sudo apt update
sudo apt install postgresql postgresql-contrib
# Install Psycopg2
pip install psycopg2
# Alternatively, for environments without PostgreSQL dev packages
pip install psycopg2-binary
Creating a Database
Let’s create a database for our application:
# Access PostgreSQL command line
sudo -u postgres psql
# Create a new database and user
CREATE DATABASE myappdb;
CREATE USER myappuser WITH PASSWORD 'mypassword';
GRANT ALL PRIVILEGES ON DATABASE myappdb TO myappuser;
Establishing Database Connections
Basic Connection
Let’s start with a simple connection pattern:
import psycopg2
def get_db_connection():
conn = psycopg2.connect(
host="localhost",
database="myappdb",
user="myappuser",
password="mypassword"
)
return conn
Connection Pooling
For production applications, connection pooling is essential for performance:
from psycopg2 import pool
# Create a connection pool
connection_pool = pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
host="localhost",
database="myappdb",
user="myappuser",
password="mypassword"
)
def get_connection():
return connection_pool.getconn()
def release_connection(conn):
connection_pool.putconn(conn)
Connection Context Manager
A more Pythonic approach using context managers:
import psycopg2
from contextlib import contextmanager
@contextmanager
def get_db_connection():
conn = None
try:
conn = psycopg2.connect(
host="localhost",
database="myappdb",
user="myappuser",
password="mypassword"
)
yield conn
finally:
if conn is not None:
conn.close()
@contextmanager
def get_db_cursor(commit=False):
with get_db_connection() as conn:
cursor = conn.cursor()
try:
yield cursor
if commit:
conn.commit()
finally:
cursor.close()
Basic Database Operations
Creating Tables
Let’s create a simple table structure for our application:
def create_tables():
create_users_table = """
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(100) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
"""
create_posts_table = """
CREATE TABLE IF NOT EXISTS posts (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(100) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
"""
with get_db_cursor(commit=True) as cursor:
cursor.execute(create_users_table)
cursor.execute(create_posts_table)
CRUD Operations
Create (Insert)
def create_user(username, email, password_hash):
with get_db_cursor(commit=True) as cursor:
cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (%s, %s, %s) RETURNING id",
(username, email, password_hash)
)
user_id = cursor.fetchone()[0]
return user_id
def create_post(user_id, title, content):
with get_db_cursor(commit=True) as cursor:
cursor.execute(
"INSERT INTO posts (user_id, title, content) VALUES (%s, %s, %s) RETURNING id",
(user_id, title, content)
)
post_id = cursor.fetchone()[0]
return post_id
Read (Select)
def get_user_by_id(user_id):
with get_db_cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return cursor.fetchone()
def get_user_posts(user_id):
with get_db_cursor() as cursor:
cursor.execute("SELECT * FROM posts WHERE user_id = %s ORDER BY created_at DESC", (user_id,))
return cursor.fetchall()
Update
def update_user_email(user_id, new_email):
with get_db_cursor(commit=True) as cursor:
cursor.execute(
"UPDATE users SET email = %s WHERE id = %s",
(new_email, user_id)
)
return cursor.rowcount > 0
def update_post(post_id, title, content):
with get_db_cursor(commit=True) as cursor:
cursor.execute(
"UPDATE posts SET title = %s, content = %s WHERE id = %s",
(title, content, post_id)
)
return cursor.rowcount > 0
Delete
def delete_user(user_id):
with get_db_cursor(commit=True) as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
return cursor.rowcount > 0
def delete_post(post_id):
with get_db_cursor(commit=True) as cursor:
cursor.execute("DELETE FROM posts WHERE id = %s", (post_id,))
return cursor.rowcount > 0
Advanced PostgreSQL Features with Psycopg2
Prepared Statements
For queries that are executed frequently, prepared statements boost performance:
def get_user_with_prepared_statement(user_id):
with get_db_connection() as conn:
# Create a cursor with a name
with conn.cursor(name="named_cursor") as cursor:
# Prepare the statement
cursor.execute("PREPARE get_user AS SELECT * FROM users WHERE id = $1")
# Execute the prepared statement
cursor.execute("EXECUTE get_user(%s)", (user_id,))
return cursor.fetchone()
Transactions
Handling transactions explicitly:
def transfer_credits(from_user_id, to_user_id, amount):
with get_db_connection() as conn:
try:
# Start a transaction
conn.autocommit = False
with conn.cursor() as cursor:
# Deduct from one user
cursor.execute(
"UPDATE users SET credits = credits - %s WHERE id = %s AND credits >= %s",
(amount, from_user_id, amount)
)
if cursor.rowcount == 0:
# Insufficient credits or user not found
raise ValueError("Insufficient credits or user not found")
# Add to another user
cursor.execute(
"UPDATE users SET credits = credits + %s WHERE id = %s",
(amount, to_user_id)
)
if cursor.rowcount == 0:
# User not found
raise ValueError("Target user not found")
# Commit the transaction
conn.commit()
return True
except (Exception, psycopg2.DatabaseError) as error:
conn.rollback()
print(f"Error in transaction: {error}")
raise
Advanced Queries
Working with complex queries:
def get_user_stats():
query = """
SELECT
u.id,
u.username,
COUNT(p.id) AS post_count,
MAX(p.created_at) AS last_post_date
FROM
users u
LEFT JOIN
posts p ON u.id = p.user_id
GROUP BY
u.id, u.username
ORDER BY
post_count DESC
"""
with get_db_cursor() as cursor:
cursor.execute(query)
return cursor.fetchall()
JSON Operations
PostgreSQL has excellent JSON support:
def create_user_with_metadata(username, email, password_hash, metadata):
"""
Create a user with JSON metadata.
metadata should be a dictionary of additional user information.
"""
with get_db_cursor(commit=True) as cursor:
cursor.execute(
"INSERT INTO users (username, email, password_hash, metadata) VALUES (%s, %s, %s, %s) RETURNING id",
(username, email, password_hash, psycopg2.extras.Json(metadata))
)
return cursor.fetchone()[0]
def get_users_by_metadata_field(field, value):
"""
Find users where metadata has a specific field with a specific value.
"""
with get_db_cursor() as cursor:
cursor.execute(
"SELECT * FROM users WHERE metadata ->> %s = %s",
(field, value)
)
return cursor.fetchall()
Best Practices
Using Environment Variables
Store sensitive connection parameters in environment variables:
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def get_db_connection():
conn = psycopg2.connect(
host=os.getenv("DB_HOST", "localhost"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", 5432)
)
return conn
Error Handling
Properly handle database errors:
def safe_db_operation(operation_func, *args, **kwargs):
try:
return operation_func(*args, **kwargs)
except psycopg2.errors.UniqueViolation:
# Handle unique constraint violations
return {"error": "Resource already exists"}
except psycopg2.errors.ForeignKeyViolation:
# Handle foreign key violations
return {"error": "Related resource does not exist"}
except psycopg2.DatabaseError as e:
# Log the error
print(f"Database error: {e}")
return {"error": "Database operation failed"}
except Exception as e:
# Log unexpected errors
print(f"Unexpected error: {e}")
return {"error": "An unexpected error occurred"}
Parameterized Queries
Always use parameterized queries to prevent SQL injection:
# BAD - vulnerable to SQL injection
def unsafe_get_user(username):
with get_db_cursor() as cursor:
cursor.execute(f"SELECT * FROM users WHERE username = '{username}'")
return cursor.fetchone()
# GOOD - safe from SQL injection
def safe_get_user(username):
with get_db_cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
return cursor.fetchone()
Connection Cleanup
Always ensure connections are properly closed:
def operation_with_manual_cleanup():
conn = None
cursor = None
try:
conn = get_db_connection()
cursor = conn.cursor()
# Perform operations
finally:
if cursor:
cursor.close()
if conn:
conn.close()
Integrating with Web Frameworks
Flask Example
from flask import Flask, request, jsonify
import psycopg2
from psycopg2.extras import RealDictCursor
app = Flask(__name__)
def get_db_connection():
conn = psycopg2.connect(
host="localhost",
database="myappdb",
user="myappuser",
password="mypassword"
)
conn.autocommit = True
return conn
@app.route('/users', methods=['GET'])
def get_users():
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("SELECT id, username, email, created_at FROM users")
users = cursor.fetchall()
cursor.close()
conn.close()
return jsonify(users)
@app.route('/users', methods=['POST'])
def add_user():
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (%s, %s, %s) RETURNING id, username, email, created_at",
(data['username'], data['email'], data['password']) # In production, hash the password
)
new_user = cursor.fetchone()
return jsonify(new_user), 201
except psycopg2.Error as e:
return jsonify({'error': str(e)}), 400
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
app.run(debug=True)
Django Example
Django already has built-in PostgreSQL support, but you can use a raw cursor for complex operations:
from django.db import connection
from django.http import JsonResponse
def complex_query_view(request):
with connection.cursor() as cursor:
cursor.execute("""
SELECT
u.id,
u.username,
COUNT(p.id) AS post_count
FROM
users u
LEFT JOIN
posts p ON u.id = p.user_id
GROUP BY
u.id, u.username
ORDER BY
post_count DESC
LIMIT 10
""")
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return JsonResponse({'top_users': results})
Conclusion
Connecting PostgreSQL with Python through Psycopg2 provides a powerful foundation for building robust backend applications. The combination gives you:
- Performance: From connection pooling to prepared statements, you have tools to optimize your database interactions.
- Security: Parameterized queries protect against SQL injection.
- Flexibility: PostgreSQL’s advanced features like JSON storage and complex queries are fully accessible.
- Reliability: Proper transaction handling ensures data consistency.
With these patterns in place, your Python backend applications can leverage the full power of PostgreSQL while maintaining clean, maintainable code.
For large-scale applications, consider exploring SQLAlchemy ORM, which can work with Psycopg2 under the hood while providing additional abstraction and database-agnostic code.
Remember that database operations are often the bottleneck in web applications, so investing time in proper database access patterns with Psycopg2 will pay dividends as your application scales.
Leave a Reply