In today’s world of microservices and distributed systems, building applications that can handle high concurrency efficiently is essential. PostgreSQL and Golang make for a powerful combination when it comes to developing such systems. This post explores strategies, patterns, and best practices for leveraging these technologies to create robust, high-performance applications.
Why PostgreSQL and Go?
Before diving into implementation details, let’s understand why this particular combination works so well:
PostgreSQL Strengths
- ACID compliance: Transaction guarantees for data integrity
- Advanced locking mechanisms: Row-level locks, advisory locks
- Rich feature set: JSON support, advanced indexing options, full-text search
- Excellent concurrency model: MVCC (Multi-Version Concurrency Control)
Go Strengths
- Lightweight goroutines: Efficient concurrent execution
- Channel-based communication: Safe data exchange between concurrent routines
- Simple yet powerful concurrency primitives: Simplifies async programming
- Excellent performance: Fast compilation, efficient execution
- Low memory footprint: Resource-efficient server applications
Together, these technologies provide a solid foundation for building systems that can efficiently handle thousands of concurrent operations.
Connection Pooling: The Foundation
The most critical component of a high-concurrency system is proper connection management. PostgreSQL has a limit to how many concurrent connections it can efficiently handle (typically 100-300, depending on hardware).
Setting Up pgx with a Connection Pool
The pgx library is the preferred PostgreSQL driver for Go applications, offering better performance than the standard database/sql
package.
gopackage main
import (
"context"
"fmt"
"os"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
// Connection pool configuration
config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to parse DATABASE_URL: %v\n", err)
os.Exit(1)
}
// Configure pool settings
config.MaxConns = 25 // Maximum connections in the pool
config.MinConns = 5 // Minimum idle connections
config.MaxConnLifetime = time.Hour // Maximum connection lifetime
config.MaxConnIdleTime = 30 * time.Minute // Maximum idle time
config.HealthCheckPeriod = 1 * time.Minute // How often to check connection health
// Create the connection pool
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to create connection pool: %v\n", err)
os.Exit(1)
}
defer pool.Close()
// Check if connection works
if err := pool.Ping(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "Unable to ping database: %v\n", err)
os.Exit(1)
}
fmt.Println("Successfully connected to PostgreSQL")
}
Optimizing Connection Pool Parameters
For high-concurrency applications, tuning your connection pool is crucial:
- MaxConns: Keep this below PostgreSQL’s
max_connections
setting (ideally 50-70%) - MinConns: Maintain enough warm connections to handle sudden traffic spikes
- MaxConnLifetime: Prevent resource leaks by recycling connections
- HealthCheckPeriod: Ensure connections remain valid
Efficient Database Operations
Once we have our connection pool set up, let’s focus on patterns for efficient database operations.
Prepared Statements
Prepared statements improve performance by parsing SQL only once and reusing the execution plan:
go// Create a prepared statement
preparedQuery, err := pool.Prepare(context.Background(), "get_user", `
SELECT id, name, email
FROM users
WHERE id = $1
`)
if err != nil {
log.Fatalf("Failed to prepare statement: %v", err)
}
// Use the prepared statement
var user User
err = pool.QueryRow(
context.Background(),
"get_user",
userId,
).Scan(&user.ID, &user.Name, &user.Email)
Batch Operations
For inserting or updating multiple records, use batch operations:
go// Start a transaction
tx, err := pool.Begin(context.Background())
if err != nil {
log.Fatalf("Failed to start transaction: %v", err)
}
defer tx.Rollback(context.Background())
// Create a batch
batch := &pgx.Batch{}
// Queue multiple inserts
for _, user := range users {
batch.Queue(
"INSERT INTO users (name, email) VALUES ($1, $2)",
user.Name,
user.Email,
)
}
// Send the batch
results := tx.SendBatch(context.Background(), batch)
defer results.Close()
// Process results
for i := 0; i < batch.Len(); i++ {
_, err := results.Exec()
if err != nil {
log.Printf("Error executing batch command %d: %v", i, err)
}
}
// Commit transaction
err = tx.Commit(context.Background())
if err != nil {
log.Fatalf("Failed to commit transaction: %v", err)
}
Efficient Query Design
Optimize your queries for high concurrency:
go// Instead of separate queries
// BAD: Multiple round-trips
userID := getUser(email)
orders := getUserOrders(userID)
details := getOrderDetails(orders)
// GOOD: Single round-trip with JOINs
rows, err := pool.Query(context.Background(), `
SELECT u.id, u.name, o.id, o.total, d.product_id, d.quantity
FROM users u
JOIN orders o ON u.id = o.user_id
JOIN order_details d ON o.id = d.order_id
WHERE u.email = $1
`, email)
Handling Concurrency with Go Routines
Go’s goroutines make it easy to implement patterns for concurrent database operations.
Worker Pool Pattern
For processing large numbers of database operations:
gofunc ProcessItems(items []Item) error {
// Create a job channel
jobs := make(chan Item, len(items))
results := make(chan error, len(items))
// Determine worker count - balance between concurrency and connection pool size
workerCount := 10
if len(items) < workerCount {
workerCount = len(items)
}
// Start workers
for w := 1; w <= workerCount; w++ {
go worker(w, pool, jobs, results)
}
// Send jobs
for _, item := range items {
jobs <- item
}
close(jobs)
// Collect results
var errors []error
for i := 0; i < len(items); i++ {
if err := <-results; err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return fmt.Errorf("encountered %d errors during processing", len(errors))
}
return nil
}
func worker(id int, pool *pgxpool.Pool, jobs <-chan Item, results chan<- error) {
for item := range jobs {
// Process item with database
err := processItem(pool, item)
results <- err
}
}
func processItem(pool *pgxpool.Pool, item Item) error {
_, err := pool.Exec(context.Background(),
"UPDATE items SET processed = true WHERE id = $1",
item.ID)
return err
}
Fan-Out, Fan-In Pattern
When you need to execute multiple queries concurrently and combine their results:
gofunc GetDashboardData(ctx context.Context, pool *pgxpool.Pool, userID int) (DashboardData, error) {
var dashboard DashboardData
// Create channels for results and errors
userCh := make(chan User, 1)
ordersCh := make(chan []Order, 1)
statsCh := make(chan UserStats, 1)
errorsCh := make(chan error, 3)
// Fan out - execute queries concurrently
go func() {
user, err := getUser(ctx, pool, userID)
if err != nil {
errorsCh <- err
return
}
userCh <- user
}()
go func() {
orders, err := getRecentOrders(ctx, pool, userID)
if err != nil {
errorsCh <- err
return
}
ordersCh <- orders
}()
go func() {
stats, err := getUserStats(ctx, pool, userID)
if err != nil {
errorsCh <- err
return
}
statsCh <- stats
}()
// Fan in - collect results with timeout
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
// Wait for all results or first error
remaining := 3
for remaining > 0 {
select {
case user := <-userCh:
dashboard.User = user
remaining--
case orders := <-ordersCh:
dashboard.RecentOrders = orders
remaining--
case stats := <-statsCh:
dashboard.Stats = stats
remaining--
case err := <-errorsCh:
return dashboard, err
case <-timeoutCtx.Done():
return dashboard, fmt.Errorf("dashboard data retrieval timed out")
}
}
return dashboard, nil
}
PostgreSQL Concurrency Features
Let’s leverage PostgreSQL’s concurrency features to handle high-load scenarios.
Row-Level Locking
When multiple operations need to update the same rows:
go// Pessimistic locking with FOR UPDATE
tx, err := pool.Begin(context.Background())
if err != nil {
return err
}
defer tx.Rollback(context.Background())
var quantity int
err = tx.QueryRow(context.Background(), `
SELECT quantity FROM inventory
WHERE product_id = $1
FOR UPDATE`,
productID,
).Scan(&quantity)
if err != nil {
return err
}
if quantity < requestedQuantity {
return errors.New("insufficient inventory")
}
_, err = tx.Exec(context.Background(), `
UPDATE inventory
SET quantity = quantity - $1
WHERE product_id = $2`,
requestedQuantity, productID,
)
if err != nil {
return err
}
return tx.Commit(context.Background())
Advisory Locks
For application-level locking patterns:
gofunc ProcessWithMutex(pool *pgxpool.Pool, resourceID int) error {
// Try to acquire an advisory lock
lockKey := int64(resourceID)
// Attempt to get the lock (non-blocking)
locked, err := pool.Acquire(context.Background(), lockKey)
if err != nil {
return fmt.Errorf("error acquiring lock: %v", err)
}
if !locked {
return fmt.Errorf("resource %d is locked by another process", resourceID)
}
// Release lock when done
defer pool.Release(context.Background(), lockKey)
// Perform work...
return nil
}
// Extended version with pgx
func ProcessWithAdvisoryLock(ctx context.Context, pool *pgxpool.Pool, resourceID int) error {
// Create a lock ID (combine multiple values if needed)
lockID := int64(resourceID)
conn, err := pool.Acquire(ctx)
if err != nil {
return fmt.Errorf("failed to acquire connection: %v", err)
}
defer conn.Release()
// Try to get the advisory lock (will wait if already locked)
_, err = conn.Exec(ctx, "SELECT pg_advisory_lock($1)", lockID)
if err != nil {
return fmt.Errorf("failed to acquire advisory lock: %v", err)
}
// Make sure we release the lock even on panic
defer func() {
_, unlockErr := conn.Exec(ctx, "SELECT pg_advisory_unlock($1)", lockID)
if unlockErr != nil {
fmt.Printf("Error releasing advisory lock: %v\n", unlockErr)
}
}()
// Perform work with exclusive access...
return nil
}
Optimistic Concurrency Control
For scenarios with potential but infrequent conflicts:
gofunc UpdateWithOptimisticLock(ctx context.Context, pool *pgxpool.Pool, item Item) error {
// Retry logic for conflicts
maxRetries := 3
for attempt := 0; attempt < maxRetries; attempt++ {
// Start transaction
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// Get current version
var currentVersion int
err = tx.QueryRow(ctx, `
SELECT version FROM items WHERE id = $1
`, item.ID).Scan(¤tVersion)
if err != nil {
return err
}
if currentVersion != item.Version {
return ErrVersionConflict
}
// Update with version check
result, err := tx.Exec(ctx, `
UPDATE items
SET name = $1, description = $2, version = version + 1
WHERE id = $3 AND version = $4
`, item.Name, item.Description, item.ID, item.Version)
if err != nil {
return err
}
// Check if update was successful
rowsAffected := result.RowsAffected()
if rowsAffected == 0 {
// Rollback and retry
tx.Rollback(ctx)
continue
}
// Commit transaction
if err := tx.Commit(ctx); err != nil {
return err
}
return nil
}
return ErrMaxRetriesExceeded
}
Handling Database Migrations
As your application evolves, you’ll need to update your database schema without downtime.
Using golang-migrate
gofunc RunMigrations(dataSourceName string) error {
m, err := migrate.New(
"file://db/migrations",
dataSourceName,
)
if err != nil {
return err
}
if err := m.Up(); err != nil && err != migrate.ErrNoChange {
return err
}
return nil
}
// Create migration files
// migrations/000001_create_users_table.up.sql
// migrations/000001_create_users_table.down.sql
Zero-Downtime Schema Changes
For high-concurrency applications, schema changes can be tricky. Follow these patterns:
- Add first, remove later: Add new columns/tables before modifying application code
- Use temporary duplicates: Maintain both old and new structures during transitions
- Background data migrations: Copy data between structures over time
- Feature flags: Control when code uses new schema elements
Performance Monitoring and Optimization
Query Performance
Use Go’s context
package with appropriate timeouts:
gofunc GetDataWithTimeout(userID int) (UserData, error) {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
var data UserData
err := pool.QueryRow(ctx, `
SELECT u.id, u.name, u.email, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.id = $1
GROUP BY u.id
`, userID).Scan(&data.ID, &data.Name, &data.Email, &data.OrderCount)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
// Record metrics for slow queries
metrics.RecordSlowQuery("get_user_data")
return data, ErrQueryTimeout
}
return data, err
}
return data, nil
}
Query Logging and Tracing
Monitor query performance with middleware:
go// Define a custom logger that implements pgx.Logger interface
type QueryLogger struct {
logger *log.Logger
}
func (l *QueryLogger) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
// Extract query information
query, queryExists := data["sql"]
args, argsExist := data["args"]
// Record detailed metrics for slow queries
if queryExists && argsExist {
startTime, ok := ctx.Value("query_start_time").(time.Time)
if ok {
duration := time.Since(startTime)
if duration > 100*time.Millisecond {
l.logger.Printf("SLOW QUERY [%s]: %s %v", duration, query, args)
// Record to metrics system
metrics.RecordQueryDuration(query.(string), duration)
}
}
}
// Log normally
l.logger.Printf("%s: %s %v", level.String(), msg, data)
}
// Usage
config.ConnConfig.Logger = &QueryLogger{logger: log.New(os.Stdout, "", log.LstdFlags)}
// Middleware to add timing to context
func withQueryTiming(ctx context.Context) context.Context {
return context.WithValue(ctx, "query_start_time", time.Now())
}
// Usage
result, err := pool.Query(withQueryTiming(ctx), "SELECT * FROM users")
Scaling Strategies
Read Replicas
For read-heavy applications, distribute load across replicas:
gotype DBCluster struct {
Primary *pgxpool.Pool
Replicas []*pgxpool.Pool
nextReplica int
}
func NewDBCluster(primaryDSN string, replicaDSNs []string) (*DBCluster, error) {
// Connect to primary
primary, err := pgxpool.New(context.Background(), primaryDSN)
if err != nil {
return nil, err
}
// Connect to replicas
replicas := make([]*pgxpool.Pool, 0, len(replicaDSNs))
for _, dsn := range replicaDSNs {
replica, err := pgxpool.New(context.Background(), dsn)
if err != nil {
// Clean up already created pools
primary.Close()
for _, r := range replicas {
r.Close()
}
return nil, err
}
replicas = append(replicas, replica)
}
return &DBCluster{
Primary: primary,
Replicas: replicas,
}, nil
}
// Method to get a replica using round-robin
func (c *DBCluster) GetReplica() *pgxpool.Pool {
if len(c.Replicas) == 0 {
return c.Primary // Fallback to primary
}
// Simple round-robin
replica := c.Replicas[c.nextReplica]
c.nextReplica = (c.nextReplica + 1) % len(c.Replicas)
return replica
}
// Example usage
func (c *DBCluster) ReadUser(ctx context.Context, id int) (User, error) {
var user User
// Use a replica for reading
err := c.GetReplica().QueryRow(ctx, `
SELECT id, name, email FROM users WHERE id = $1
`, id).Scan(&user.ID, &user.Name, &user.Email)
return user, err
}
func (c *DBCluster) CreateUser(ctx context.Context, user User) (int, error) {
// Always use primary for writes
var id int
err := c.Primary.QueryRow(ctx, `
INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id
`, user.Name, user.Email).Scan(&id)
return id, err
}
Sharding
For very large datasets, consider database sharding:
gotype ShardedDB struct {
Shards []*pgxpool.Pool
ShardCount int
}
func NewShardedDB(shardDSNs []string) (*ShardedDB, error) {
shards := make([]*pgxpool.Pool, 0, len(shardDSNs))
for _, dsn := range shardDSNs {
shard, err := pgxpool.New(context.Background(), dsn)
if err != nil {
// Clean up already created pools
for _, s := range shards {
s.Close()
}
return nil, err
}
shards = append(shards, shard)
}
return &ShardedDB{
Shards: shards,
ShardCount: len(shardDSNs),
}, nil
}
// Get shard for a specific user
func (s *ShardedDB) GetUserShard(userID int) *pgxpool.Pool {
shardIndex := userID % s.ShardCount
return s.Shards[shardIndex]
}
// Example usage
func (s *ShardedDB) GetUserData(ctx context.Context, userID int) (UserData, error) {
shard := s.GetUserShard(userID)
var data UserData
err := shard.QueryRow(ctx, `
SELECT id, name, email FROM users WHERE id = $1
`, userID).Scan(&data.ID, &data.Name, &data.Email)
return data, err
}
Error Handling and Resilience
Circuit Breaker Pattern
Protect your database when errors start occurring:
gotype CircuitBreaker struct {
pool *pgxpool.Pool
state string
failures int
threshold int
timeout time.Time
resetTimeout time.Duration
mutex sync.RWMutex
}
func NewCircuitBreaker(pool *pgxpool.Pool, threshold int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
pool: pool,
state: "closed",
failures: 0,
threshold: threshold,
resetTimeout: resetTimeout,
}
}
func (cb *CircuitBreaker) Execute(ctx context.Context, query string, args ...interface{}) (pgx.Rows, error) {
cb.mutex.RLock()
state := cb.state
cb.mutex.RUnlock()
if state == "open" {
if time.Now().After(cb.timeout) {
// Try to reset to half-open
cb.mutex.Lock()
cb.state = "half-open"
cb.mutex.Unlock()
} else {
return nil, ErrCircuitOpen
}
}
// Execute query
rows, err := cb.pool.Query(ctx, query, args...)
// Handle result
if err != nil {
cb.mutex.Lock()
defer cb.mutex.Unlock()
cb.failures++
if cb.failures >= cb.threshold {
cb.state = "open"
cb.timeout = time.Now().Add(cb.resetTimeout)
// Log circuit open event
log.Printf("Circuit breaker opened for database: %v failures", cb.failures)
}
return nil, err
}
// If query succeeded in half-open state, reset the circuit
if state == "half-open" {
cb.mutex.Lock()
cb.state = "closed"
cb.failures = 0
cb.mutex.Unlock()
// Log circuit closed event
log.Println("Circuit breaker reset to closed")
}
return rows, nil
}
Graceful Degradation
Handle temporary database unavailability:
gofunc GetProductWithFallback(ctx context.Context, pool *pgxpool.Pool, productID int) (Product, error) {
var product Product
// Try from database first
err := pool.QueryRow(ctx, `
SELECT id, name, price, stock FROM products WHERE id = $1
`, productID).Scan(&product.ID, &product.Name, &product.Price, &product.Stock)
if err == nil {
// Success! Save to cache for future fallback
cacheProduct(product)
return product, nil
}
// Check if this is a connection issue
if isConnectionError(err) {
// Try to get from cache
cachedProduct, found := getProductFromCache(productID)
if found {
// Return cached data with flag indicating it might be stale
cachedProduct.FromCache = true
return cachedProduct, nil
}
}
// No cache available or different error
return product, err
}
Conclusion
Building high-concurrency applications with PostgreSQL and Go requires careful attention to:
- Connection management: Use properly configured connection pools
- Query efficiency: Write optimized queries and use prepared statements
- Concurrency patterns: Leverage Go’s goroutines and channels
- Database concurrency features: Utilize PostgreSQL’s built-in concurrency support
- Resilience patterns: Implement circuit breakers and graceful degradation
By applying these techniques, you can create applications that deliver exceptional performance even under heavy load. The combination of PostgreSQL’s robust data management capabilities and Go’s efficient concurrency model provides a solid foundation for building scalable, high-performance systems.
Remember that performance tuning is an ongoing process – continuously monitor your application, identify bottlenecks, and refine your implementation to ensure optimal performance as your user base grows.
go// Final thought: A well-designed high-concurrency system balances between
// maximum throughput and system stability. Sometimes accepting slightly
// lower throughput (by limiting concurrency) leads to more predictable
// and reliable overall performance.
Leave a Reply