feat(backend): implement db helpers and postgres pool/transactor
db/db.go: TxFromContext/ContextWithTx for transaction propagation, Querier interface (QueryRow/Query/Exec), ScanRow generic helper, ClampLimit/ClampOffset pagination guards. db/postgres/postgres.go: NewPool with ping validation, Transactor backed by pgxpool (BeginTx → fn → commit/rollback), connOrTx helper that returns the active transaction from context or falls back to pool. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
83fda85bea
commit
2c83073903
70
backend/internal/db/db.go
Normal file
70
backend/internal/db/db.go
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
// Package db provides shared helpers used by all database adapters.
|
||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
// txKey is the context key used to store an active transaction.
|
||||||
|
type txKey struct{}
|
||||||
|
|
||||||
|
// TxFromContext returns the pgx.Tx stored in ctx by the Transactor, along
|
||||||
|
// with a boolean indicating whether a transaction is active.
|
||||||
|
func TxFromContext(ctx context.Context) (pgx.Tx, bool) {
|
||||||
|
tx, ok := ctx.Value(txKey{}).(pgx.Tx)
|
||||||
|
return tx, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContextWithTx returns a copy of ctx that carries tx.
|
||||||
|
// Called by the Transactor before invoking the user function.
|
||||||
|
func ContextWithTx(ctx context.Context, tx pgx.Tx) context.Context {
|
||||||
|
return context.WithValue(ctx, txKey{}, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Querier is the common query interface satisfied by both *pgxpool.Pool and
|
||||||
|
// pgx.Tx, allowing repo helpers to work with either.
|
||||||
|
type Querier interface {
|
||||||
|
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
|
||||||
|
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
|
||||||
|
Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ScanRow executes a single-row query against q and scans the result using
|
||||||
|
// scan. It wraps pgx.ErrNoRows so callers can detect missing rows without
|
||||||
|
// importing pgx directly.
|
||||||
|
func ScanRow[T any](ctx context.Context, q Querier, sql string, args []any, scan func(pgx.Row) (T, error)) (T, error) {
|
||||||
|
row := q.QueryRow(ctx, sql, args...)
|
||||||
|
val, err := scan(row)
|
||||||
|
if err != nil {
|
||||||
|
var zero T
|
||||||
|
if err == pgx.ErrNoRows {
|
||||||
|
return zero, fmt.Errorf("%w", pgx.ErrNoRows)
|
||||||
|
}
|
||||||
|
return zero, fmt.Errorf("ScanRow: %w", err)
|
||||||
|
}
|
||||||
|
return val, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClampLimit enforces the [1, max] range on limit, returning def when limit
|
||||||
|
// is zero or negative.
|
||||||
|
func ClampLimit(limit, def, max int) int {
|
||||||
|
if limit <= 0 {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
if limit > max {
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
return limit
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClampOffset returns 0 for negative offsets.
|
||||||
|
func ClampOffset(offset int) int {
|
||||||
|
if offset < 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return offset
|
||||||
|
}
|
||||||
67
backend/internal/db/postgres/postgres.go
Normal file
67
backend/internal/db/postgres/postgres.go
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
// Package postgres provides the PostgreSQL implementations of the port interfaces.
|
||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
|
||||||
|
"tanabata/backend/internal/db"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewPool creates and validates a *pgxpool.Pool from the given connection URL.
|
||||||
|
// The pool is ready to use; the caller is responsible for closing it.
|
||||||
|
func NewPool(ctx context.Context, url string) (*pgxpool.Pool, error) {
|
||||||
|
pool, err := pgxpool.New(ctx, url)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("pgxpool.New: %w", err)
|
||||||
|
}
|
||||||
|
if err := pool.Ping(ctx); err != nil {
|
||||||
|
pool.Close()
|
||||||
|
return nil, fmt.Errorf("postgres ping: %w", err)
|
||||||
|
}
|
||||||
|
return pool, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transactor implements port.Transactor using a pgxpool.Pool.
|
||||||
|
type Transactor struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTransactor creates a Transactor backed by pool.
|
||||||
|
func NewTransactor(pool *pgxpool.Pool) *Transactor {
|
||||||
|
return &Transactor{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithTx begins a transaction, stores it in ctx, and calls fn. If fn returns
|
||||||
|
// an error the transaction is rolled back; otherwise it is committed.
|
||||||
|
func (t *Transactor) WithTx(ctx context.Context, fn func(ctx context.Context) error) error {
|
||||||
|
tx, err := t.pool.BeginTx(ctx, pgx.TxOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("begin tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
txCtx := db.ContextWithTx(ctx, tx)
|
||||||
|
|
||||||
|
if err := fn(txCtx); err != nil {
|
||||||
|
_ = tx.Rollback(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(ctx); err != nil {
|
||||||
|
return fmt.Errorf("commit tx: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// connOrTx returns the pgx.Tx stored in ctx by WithTx, or the pool itself when
|
||||||
|
// no transaction is active. The returned value satisfies db.Querier and can be
|
||||||
|
// used directly for queries and commands.
|
||||||
|
func connOrTx(ctx context.Context, pool *pgxpool.Pool) db.Querier {
|
||||||
|
if tx, ok := db.TxFromContext(ctx); ok {
|
||||||
|
return tx
|
||||||
|
}
|
||||||
|
return pool
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user