// Package db provides database connectivity and migrations. package db import ( "context" "database/sql" "embed" "fmt" "log/slog" "sort" "strings" "time" _ "github.com/lib/pq" ) //go:embed migrations/*.sql var migrationsFS embed.FS // Config holds database connection configuration. type Config struct { Host string Port int User string Password string Database string SSLMode string } // DefaultConfig returns config from environment defaults. func DefaultConfig() Config { return Config{ Host: "postgres.databases.svc", Port: 5432, User: "appuser", Password: "", Database: "rdev", SSLMode: "disable", } } // DSN returns the connection string. func (c Config) DSN() string { return fmt.Sprintf( "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", c.Host, c.Port, c.User, c.Password, c.Database, c.SSLMode, ) } // DB wraps the sql.DB with additional functionality. type DB struct { *sql.DB logger *slog.Logger } // New creates a new database connection and runs migrations. func New(cfg Config, logger *slog.Logger) (*DB, error) { db, err := sql.Open("postgres", cfg.DSN()) if err != nil { return nil, fmt.Errorf("open db: %w", err) } // Configure connection pool // MaxOpenConns: limit concurrent connections to avoid overloading database db.SetMaxOpenConns(25) // MaxIdleConns: maintain some connections for reuse db.SetMaxIdleConns(10) // ConnMaxLifetime: recycle connections to pick up config changes db.SetConnMaxLifetime(5 * time.Minute) // ConnMaxIdleTime: close idle connections to free resources db.SetConnMaxIdleTime(1 * time.Minute) // Verify connection ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := db.PingContext(ctx); err != nil { return nil, fmt.Errorf("ping db: %w", err) } wrapped := &DB{DB: db, logger: logger} // Run migrations if err := wrapped.migrate(); err != nil { return nil, fmt.Errorf("migrate: %w", err) } return wrapped, nil } // migrate runs all pending migrations. func (db *DB) migrate() error { // Create migrations table if not exists _, err := db.Exec(` CREATE TABLE IF NOT EXISTS schema_migrations ( version VARCHAR(255) PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) `) if err != nil { return fmt.Errorf("create migrations table: %w", err) } // Get applied migrations applied := make(map[string]bool) rows, err := db.Query("SELECT version FROM schema_migrations") if err != nil { return fmt.Errorf("query migrations: %w", err) } defer func() { _ = rows.Close() }() for rows.Next() { var version string if err := rows.Scan(&version); err != nil { return fmt.Errorf("scan version: %w", err) } applied[version] = true } // Read migration files entries, err := migrationsFS.ReadDir("migrations") if err != nil { return fmt.Errorf("read migrations dir: %w", err) } // Sort by filename (version order) var files []string for _, e := range entries { if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") { files = append(files, e.Name()) } } sort.Strings(files) // Apply pending migrations for _, file := range files { version := strings.TrimSuffix(file, ".sql") if applied[version] { continue } content, err := migrationsFS.ReadFile("migrations/" + file) if err != nil { return fmt.Errorf("read migration %s: %w", file, err) } db.logger.Info("applying migration", "version", version) tx, err := db.Begin() if err != nil { return fmt.Errorf("begin tx: %w", err) } if _, err := tx.Exec(string(content)); err != nil { _ = tx.Rollback() return fmt.Errorf("exec migration %s: %w", version, err) } if _, err := tx.Exec("INSERT INTO schema_migrations (version) VALUES ($1)", version); err != nil { _ = tx.Rollback() return fmt.Errorf("record migration %s: %w", version, err) } if err := tx.Commit(); err != nil { return fmt.Errorf("commit migration %s: %w", version, err) } db.logger.Info("applied migration", "version", version) } return nil } // Close closes the database connection. func (db *DB) Close() error { return db.DB.Close() }