Building High-Concurrency Apps with PostgreSQL and Golang

In today’s world of microservices and distributed systems, building applications that can handle high concurrency efficiently is essential. PostgreSQL and Golang make for a powerful combination when it comes to developing such systems. This post explores strategies, patterns, and best practices for leveraging these technologies to create robust, high-performance applications.

Why PostgreSQL and Go?

Before diving into implementation details, let’s understand why this particular combination works so well:

PostgreSQL Strengths

  • ACID compliance: Transaction guarantees for data integrity
  • Advanced locking mechanisms: Row-level locks, advisory locks
  • Rich feature set: JSON support, advanced indexing options, full-text search
  • Excellent concurrency model: MVCC (Multi-Version Concurrency Control)

Go Strengths

  • Lightweight goroutines: Efficient concurrent execution
  • Channel-based communication: Safe data exchange between concurrent routines
  • Simple yet powerful concurrency primitives: Simplifies async programming
  • Excellent performance: Fast compilation, efficient execution
  • Low memory footprint: Resource-efficient server applications

Together, these technologies provide a solid foundation for building systems that can efficiently handle thousands of concurrent operations.

Connection Pooling: The Foundation

The most critical component of a high-concurrency system is proper connection management. PostgreSQL has a limit to how many concurrent connections it can efficiently handle (typically 100-300, depending on hardware).

Setting Up pgx with a Connection Pool

The pgx library is the preferred PostgreSQL driver for Go applications, offering better performance than the standard database/sql package.

gopackage main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
)

func main() {
	// Connection pool configuration
	config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to parse DATABASE_URL: %v\n", err)
		os.Exit(1)
	}

	// Configure pool settings
	config.MaxConns = 25                           // Maximum connections in the pool
	config.MinConns = 5                            // Minimum idle connections
	config.MaxConnLifetime = time.Hour             // Maximum connection lifetime
	config.MaxConnIdleTime = 30 * time.Minute      // Maximum idle time
	config.HealthCheckPeriod = 1 * time.Minute     // How often to check connection health
	
	// Create the connection pool
	pool, err := pgxpool.NewWithConfig(context.Background(), config)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to create connection pool: %v\n", err)
		os.Exit(1)
	}
	defer pool.Close()

	// Check if connection works
	if err := pool.Ping(context.Background()); err != nil {
		fmt.Fprintf(os.Stderr, "Unable to ping database: %v\n", err)
		os.Exit(1)
	}

	fmt.Println("Successfully connected to PostgreSQL")
}

Optimizing Connection Pool Parameters

For high-concurrency applications, tuning your connection pool is crucial:

  1. MaxConns: Keep this below PostgreSQL’s max_connections setting (ideally 50-70%)
  2. MinConns: Maintain enough warm connections to handle sudden traffic spikes
  3. MaxConnLifetime: Prevent resource leaks by recycling connections
  4. HealthCheckPeriod: Ensure connections remain valid

Efficient Database Operations

Once we have our connection pool set up, let’s focus on patterns for efficient database operations.

Prepared Statements

Prepared statements improve performance by parsing SQL only once and reusing the execution plan:

go// Create a prepared statement
preparedQuery, err := pool.Prepare(context.Background(), "get_user", `
    SELECT id, name, email 
    FROM users 
    WHERE id = $1
`)
if err != nil {
    log.Fatalf("Failed to prepare statement: %v", err)
}

// Use the prepared statement
var user User
err = pool.QueryRow(
    context.Background(), 
    "get_user", 
    userId,
).Scan(&user.ID, &user.Name, &user.Email)

Batch Operations

For inserting or updating multiple records, use batch operations:

go// Start a transaction
tx, err := pool.Begin(context.Background())
if err != nil {
    log.Fatalf("Failed to start transaction: %v", err)
}
defer tx.Rollback(context.Background())

// Create a batch
batch := &pgx.Batch{}

// Queue multiple inserts
for _, user := range users {
    batch.Queue(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        user.Name,
        user.Email,
    )
}

// Send the batch
results := tx.SendBatch(context.Background(), batch)
defer results.Close()

// Process results
for i := 0; i < batch.Len(); i++ {
    _, err := results.Exec()
    if err != nil {
        log.Printf("Error executing batch command %d: %v", i, err)
    }
}

// Commit transaction
err = tx.Commit(context.Background())
if err != nil {
    log.Fatalf("Failed to commit transaction: %v", err)
}

Efficient Query Design

Optimize your queries for high concurrency:

go// Instead of separate queries
// BAD: Multiple round-trips
userID := getUser(email)
orders := getUserOrders(userID)
details := getOrderDetails(orders)

// GOOD: Single round-trip with JOINs
rows, err := pool.Query(context.Background(), `
    SELECT u.id, u.name, o.id, o.total, d.product_id, d.quantity
    FROM users u
    JOIN orders o ON u.id = o.user_id
    JOIN order_details d ON o.id = d.order_id
    WHERE u.email = $1
`, email)

Handling Concurrency with Go Routines

Go’s goroutines make it easy to implement patterns for concurrent database operations.

Worker Pool Pattern

For processing large numbers of database operations:

gofunc ProcessItems(items []Item) error {
    // Create a job channel
    jobs := make(chan Item, len(items))
    results := make(chan error, len(items))
    
    // Determine worker count - balance between concurrency and connection pool size
    workerCount := 10
    if len(items) < workerCount {
        workerCount = len(items)
    }
    
    // Start workers
    for w := 1; w <= workerCount; w++ {
        go worker(w, pool, jobs, results)
    }
    
    // Send jobs
    for _, item := range items {
        jobs <- item
    }
    close(jobs)
    
    // Collect results
    var errors []error
    for i := 0; i < len(items); i++ {
        if err := <-results; err != nil {
            errors = append(errors, err)
        }
    }
    
    if len(errors) > 0 {
        return fmt.Errorf("encountered %d errors during processing", len(errors))
    }
    return nil
}

func worker(id int, pool *pgxpool.Pool, jobs <-chan Item, results chan<- error) {
    for item := range jobs {
        // Process item with database
        err := processItem(pool, item)
        results <- err
    }
}

func processItem(pool *pgxpool.Pool, item Item) error {
    _, err := pool.Exec(context.Background(),
        "UPDATE items SET processed = true WHERE id = $1",
        item.ID)
    return err
}

Fan-Out, Fan-In Pattern

When you need to execute multiple queries concurrently and combine their results:

gofunc GetDashboardData(ctx context.Context, pool *pgxpool.Pool, userID int) (DashboardData, error) {
    var dashboard DashboardData
    
    // Create channels for results and errors
    userCh := make(chan User, 1)
    ordersCh := make(chan []Order, 1)
    statsCh := make(chan UserStats, 1)
    errorsCh := make(chan error, 3)
    
    // Fan out - execute queries concurrently
    go func() {
        user, err := getUser(ctx, pool, userID)
        if err != nil {
            errorsCh <- err
            return
        }
        userCh <- user
    }()
    
    go func() {
        orders, err := getRecentOrders(ctx, pool, userID)
        if err != nil {
            errorsCh <- err
            return
        }
        ordersCh <- orders
    }()
    
    go func() {
        stats, err := getUserStats(ctx, pool, userID)
        if err != nil {
            errorsCh <- err
            return
        }
        statsCh <- stats
    }()
    
    // Fan in - collect results with timeout
    timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
    defer cancel()
    
    // Wait for all results or first error
    remaining := 3
    for remaining > 0 {
        select {
        case user := <-userCh:
            dashboard.User = user
            remaining--
        case orders := <-ordersCh:
            dashboard.RecentOrders = orders
            remaining--
        case stats := <-statsCh:
            dashboard.Stats = stats
            remaining--
        case err := <-errorsCh:
            return dashboard, err
        case <-timeoutCtx.Done():
            return dashboard, fmt.Errorf("dashboard data retrieval timed out")
        }
    }
    
    return dashboard, nil
}

PostgreSQL Concurrency Features

Let’s leverage PostgreSQL’s concurrency features to handle high-load scenarios.

Row-Level Locking

When multiple operations need to update the same rows:

go// Pessimistic locking with FOR UPDATE
tx, err := pool.Begin(context.Background())
if err != nil {
    return err
}
defer tx.Rollback(context.Background())

var quantity int
err = tx.QueryRow(context.Background(), `
    SELECT quantity FROM inventory 
    WHERE product_id = $1 
    FOR UPDATE`,
    productID,
).Scan(&quantity)

if err != nil {
    return err
}

if quantity < requestedQuantity {
    return errors.New("insufficient inventory")
}

_, err = tx.Exec(context.Background(), `
    UPDATE inventory 
    SET quantity = quantity - $1 
    WHERE product_id = $2`,
    requestedQuantity, productID,
)
if err != nil {
    return err
}

return tx.Commit(context.Background())

Advisory Locks

For application-level locking patterns:

gofunc ProcessWithMutex(pool *pgxpool.Pool, resourceID int) error {
    // Try to acquire an advisory lock
    lockKey := int64(resourceID)
    
    // Attempt to get the lock (non-blocking)
    locked, err := pool.Acquire(context.Background(), lockKey)
    if err != nil {
        return fmt.Errorf("error acquiring lock: %v", err)
    }
    
    if !locked {
        return fmt.Errorf("resource %d is locked by another process", resourceID)
    }
    
    // Release lock when done
    defer pool.Release(context.Background(), lockKey)
    
    // Perform work...
    return nil
}

// Extended version with pgx
func ProcessWithAdvisoryLock(ctx context.Context, pool *pgxpool.Pool, resourceID int) error {
    // Create a lock ID (combine multiple values if needed)
    lockID := int64(resourceID)
    
    conn, err := pool.Acquire(ctx)
    if err != nil {
        return fmt.Errorf("failed to acquire connection: %v", err)
    }
    defer conn.Release()
    
    // Try to get the advisory lock (will wait if already locked)
    _, err = conn.Exec(ctx, "SELECT pg_advisory_lock($1)", lockID)
    if err != nil {
        return fmt.Errorf("failed to acquire advisory lock: %v", err)
    }
    
    // Make sure we release the lock even on panic
    defer func() {
        _, unlockErr := conn.Exec(ctx, "SELECT pg_advisory_unlock($1)", lockID)
        if unlockErr != nil {
            fmt.Printf("Error releasing advisory lock: %v\n", unlockErr)
        }
    }()
    
    // Perform work with exclusive access...
    
    return nil
}

Optimistic Concurrency Control

For scenarios with potential but infrequent conflicts:

gofunc UpdateWithOptimisticLock(ctx context.Context, pool *pgxpool.Pool, item Item) error {
    // Retry logic for conflicts
    maxRetries := 3
    for attempt := 0; attempt < maxRetries; attempt++ {
        // Start transaction
        tx, err := pool.Begin(ctx)
        if err != nil {
            return err
        }
        defer tx.Rollback(ctx)
        
        // Get current version
        var currentVersion int
        err = tx.QueryRow(ctx, `
            SELECT version FROM items WHERE id = $1
        `, item.ID).Scan(&currentVersion)
        
        if err != nil {
            return err
        }
        
        if currentVersion != item.Version {
            return ErrVersionConflict
        }
        
        // Update with version check
        result, err := tx.Exec(ctx, `
            UPDATE items
            SET name = $1, description = $2, version = version + 1
            WHERE id = $3 AND version = $4
        `, item.Name, item.Description, item.ID, item.Version)
        
        if err != nil {
            return err
        }
        
        // Check if update was successful
        rowsAffected := result.RowsAffected()
        if rowsAffected == 0 {
            // Rollback and retry
            tx.Rollback(ctx)
            continue
        }
        
        // Commit transaction
        if err := tx.Commit(ctx); err != nil {
            return err
        }
        
        return nil
    }
    
    return ErrMaxRetriesExceeded
}

Handling Database Migrations

As your application evolves, you’ll need to update your database schema without downtime.

Using golang-migrate

gofunc RunMigrations(dataSourceName string) error {
    m, err := migrate.New(
        "file://db/migrations",
        dataSourceName,
    )
    if err != nil {
        return err
    }
    
    if err := m.Up(); err != nil && err != migrate.ErrNoChange {
        return err
    }
    
    return nil
}

// Create migration files
// migrations/000001_create_users_table.up.sql
// migrations/000001_create_users_table.down.sql

Zero-Downtime Schema Changes

For high-concurrency applications, schema changes can be tricky. Follow these patterns:

  1. Add first, remove later: Add new columns/tables before modifying application code
  2. Use temporary duplicates: Maintain both old and new structures during transitions
  3. Background data migrations: Copy data between structures over time
  4. Feature flags: Control when code uses new schema elements

Performance Monitoring and Optimization

Query Performance

Use Go’s context package with appropriate timeouts:

gofunc GetDataWithTimeout(userID int) (UserData, error) {
    // Create context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()
    
    var data UserData
    err := pool.QueryRow(ctx, `
        SELECT u.id, u.name, u.email, COUNT(o.id) as order_count
        FROM users u
        LEFT JOIN orders o ON u.id = o.user_id
        WHERE u.id = $1
        GROUP BY u.id
    `, userID).Scan(&data.ID, &data.Name, &data.Email, &data.OrderCount)
    
    if err != nil {
        if ctx.Err() == context.DeadlineExceeded {
            // Record metrics for slow queries
            metrics.RecordSlowQuery("get_user_data")
            return data, ErrQueryTimeout
        }
        return data, err
    }
    
    return data, nil
}

Query Logging and Tracing

Monitor query performance with middleware:

go// Define a custom logger that implements pgx.Logger interface
type QueryLogger struct {
    logger *log.Logger
}

func (l *QueryLogger) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
    // Extract query information
    query, queryExists := data["sql"]
    args, argsExist := data["args"]
    
    // Record detailed metrics for slow queries
    if queryExists && argsExist {
        startTime, ok := ctx.Value("query_start_time").(time.Time)
        if ok {
            duration := time.Since(startTime)
            if duration > 100*time.Millisecond {
                l.logger.Printf("SLOW QUERY [%s]: %s %v", duration, query, args)
                // Record to metrics system
                metrics.RecordQueryDuration(query.(string), duration)
            }
        }
    }
    
    // Log normally
    l.logger.Printf("%s: %s %v", level.String(), msg, data)
}

// Usage
config.ConnConfig.Logger = &QueryLogger{logger: log.New(os.Stdout, "", log.LstdFlags)}

// Middleware to add timing to context
func withQueryTiming(ctx context.Context) context.Context {
    return context.WithValue(ctx, "query_start_time", time.Now())
}

// Usage
result, err := pool.Query(withQueryTiming(ctx), "SELECT * FROM users")

Scaling Strategies

Read Replicas

For read-heavy applications, distribute load across replicas:

gotype DBCluster struct {
    Primary   *pgxpool.Pool
    Replicas  []*pgxpool.Pool
    nextReplica int
}

func NewDBCluster(primaryDSN string, replicaDSNs []string) (*DBCluster, error) {
    // Connect to primary
    primary, err := pgxpool.New(context.Background(), primaryDSN)
    if err != nil {
        return nil, err
    }
    
    // Connect to replicas
    replicas := make([]*pgxpool.Pool, 0, len(replicaDSNs))
    for _, dsn := range replicaDSNs {
        replica, err := pgxpool.New(context.Background(), dsn)
        if err != nil {
            // Clean up already created pools
            primary.Close()
            for _, r := range replicas {
                r.Close()
            }
            return nil, err
        }
        replicas = append(replicas, replica)
    }
    
    return &DBCluster{
        Primary:  primary,
        Replicas: replicas,
    }, nil
}

// Method to get a replica using round-robin
func (c *DBCluster) GetReplica() *pgxpool.Pool {
    if len(c.Replicas) == 0 {
        return c.Primary // Fallback to primary
    }
    
    // Simple round-robin
    replica := c.Replicas[c.nextReplica]
    c.nextReplica = (c.nextReplica + 1) % len(c.Replicas)
    return replica
}

// Example usage
func (c *DBCluster) ReadUser(ctx context.Context, id int) (User, error) {
    var user User
    // Use a replica for reading
    err := c.GetReplica().QueryRow(ctx, `
        SELECT id, name, email FROM users WHERE id = $1
    `, id).Scan(&user.ID, &user.Name, &user.Email)
    return user, err
}

func (c *DBCluster) CreateUser(ctx context.Context, user User) (int, error) {
    // Always use primary for writes
    var id int
    err := c.Primary.QueryRow(ctx, `
        INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id
    `, user.Name, user.Email).Scan(&id)
    return id, err
}

Sharding

For very large datasets, consider database sharding:

gotype ShardedDB struct {
    Shards []*pgxpool.Pool
    ShardCount int
}

func NewShardedDB(shardDSNs []string) (*ShardedDB, error) {
    shards := make([]*pgxpool.Pool, 0, len(shardDSNs))
    
    for _, dsn := range shardDSNs {
        shard, err := pgxpool.New(context.Background(), dsn)
        if err != nil {
            // Clean up already created pools
            for _, s := range shards {
                s.Close()
            }
            return nil, err
        }
        shards = append(shards, shard)
    }
    
    return &ShardedDB{
        Shards: shards,
        ShardCount: len(shardDSNs),
    }, nil
}

// Get shard for a specific user
func (s *ShardedDB) GetUserShard(userID int) *pgxpool.Pool {
    shardIndex := userID % s.ShardCount
    return s.Shards[shardIndex]
}

// Example usage
func (s *ShardedDB) GetUserData(ctx context.Context, userID int) (UserData, error) {
    shard := s.GetUserShard(userID)
    
    var data UserData
    err := shard.QueryRow(ctx, `
        SELECT id, name, email FROM users WHERE id = $1
    `, userID).Scan(&data.ID, &data.Name, &data.Email)
    
    return data, err
}

Error Handling and Resilience

Circuit Breaker Pattern

Protect your database when errors start occurring:

gotype CircuitBreaker struct {
    pool          *pgxpool.Pool
    state         string
    failures      int
    threshold     int
    timeout       time.Time
    resetTimeout  time.Duration
    mutex         sync.RWMutex
}

func NewCircuitBreaker(pool *pgxpool.Pool, threshold int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        pool:         pool,
        state:        "closed",
        failures:     0,
        threshold:    threshold,
        resetTimeout: resetTimeout,
    }
}

func (cb *CircuitBreaker) Execute(ctx context.Context, query string, args ...interface{}) (pgx.Rows, error) {
    cb.mutex.RLock()
    state := cb.state
    cb.mutex.RUnlock()
    
    if state == "open" {
        if time.Now().After(cb.timeout) {
            // Try to reset to half-open
            cb.mutex.Lock()
            cb.state = "half-open"
            cb.mutex.Unlock()
        } else {
            return nil, ErrCircuitOpen
        }
    }
    
    // Execute query
    rows, err := cb.pool.Query(ctx, query, args...)
    
    // Handle result
    if err != nil {
        cb.mutex.Lock()
        defer cb.mutex.Unlock()
        
        cb.failures++
        if cb.failures >= cb.threshold {
            cb.state = "open"
            cb.timeout = time.Now().Add(cb.resetTimeout)
            // Log circuit open event
            log.Printf("Circuit breaker opened for database: %v failures", cb.failures)
        }
        return nil, err
    }
    
    // If query succeeded in half-open state, reset the circuit
    if state == "half-open" {
        cb.mutex.Lock()
        cb.state = "closed"
        cb.failures = 0
        cb.mutex.Unlock()
        // Log circuit closed event
        log.Println("Circuit breaker reset to closed")
    }
    
    return rows, nil
}

Graceful Degradation

Handle temporary database unavailability:

gofunc GetProductWithFallback(ctx context.Context, pool *pgxpool.Pool, productID int) (Product, error) {
    var product Product
    
    // Try from database first
    err := pool.QueryRow(ctx, `
        SELECT id, name, price, stock FROM products WHERE id = $1
    `, productID).Scan(&product.ID, &product.Name, &product.Price, &product.Stock)
    
    if err == nil {
        // Success! Save to cache for future fallback
        cacheProduct(product)
        return product, nil
    }
    
    // Check if this is a connection issue
    if isConnectionError(err) {
        // Try to get from cache
        cachedProduct, found := getProductFromCache(productID)
        if found {
            // Return cached data with flag indicating it might be stale
            cachedProduct.FromCache = true
            return cachedProduct, nil
        }
    }
    
    // No cache available or different error
    return product, err
}

Conclusion

Building high-concurrency applications with PostgreSQL and Go requires careful attention to:

  1. Connection management: Use properly configured connection pools
  2. Query efficiency: Write optimized queries and use prepared statements
  3. Concurrency patterns: Leverage Go’s goroutines and channels
  4. Database concurrency features: Utilize PostgreSQL’s built-in concurrency support
  5. Resilience patterns: Implement circuit breakers and graceful degradation

By applying these techniques, you can create applications that deliver exceptional performance even under heavy load. The combination of PostgreSQL’s robust data management capabilities and Go’s efficient concurrency model provides a solid foundation for building scalable, high-performance systems.

Remember that performance tuning is an ongoing process – continuously monitor your application, identify bottlenecks, and refine your implementation to ensure optimal performance as your user base grows.

go// Final thought: A well-designed high-concurrency system balances between
// maximum throughput and system stability. Sometimes accepting slightly 
// lower throughput (by limiting concurrency) leads to more predictable
// and reliable overall performance.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

CAPTCHA ImageChange Image