feat(backend): duplicate pairs, dismissals, and merge resolution

Adds the duplicate-detection backend on top of perceptual hashing:

- Two tables (edited into the original migrations): data.duplicate_pairs holds
  precomputed near-duplicate candidates (rebuilt wholesale by the rescan), and
  data.duplicate_dismissals is a global "not a duplicate" overlay that survives
  rescans. New audit actions file_merge / duplicate_dismiss.
- DuplicateService:
  - Rescan builds every pair within DUPLICATE_HASH_THRESHOLD via a BK-tree over
    the perceptual hashes and replaces the pairs table. This is the only thing
    that populates pairs, so GET never compares all-vs-all (scales to 110k+).
  - Clusters reads the precomputed pairs (ACL-filtered, non-trashed, non-
    dismissed), groups them into connected components via union-find, and
    paginates whole clusters.
  - Resolve merges a pair field-by-field: each scalar from keep or discard,
    metadata keep/discard/shallow-merge, tags/pools keep or union; then trashes
    the discarded file. Enforces edit ACL on both.
  - Dismiss records a canonical pair (view ACL on both).
- Endpoints under /files: GET /files/duplicates, POST /files/duplicates/dismiss,
  POST /files/duplicates/resolve (registered before /:id to avoid collision).
  Plain delete reuses /files/bulk/delete.
- Repo support: ListMissingPHash, ListAllPHashes, CopyPoolMemberships, plus the
  DuplicatePairRepo (ReplaceAll via COPY, ListVisible) and DismissalRepo.

Unit tests cover the BK-tree pairing, union-find clustering, metadata merge and
field validation; an integration test covers rescan -> list -> merge -> dismiss
(including that a dismissal survives a re-rescan).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-16 12:42:37 +03:00
parent 88849cc16b
commit 9216a8687f
15 changed files with 1214 additions and 4 deletions
@@ -0,0 +1,145 @@
package postgres
import (
"bytes"
"context"
"fmt"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"tanabata/backend/internal/domain"
"tanabata/backend/internal/port"
)
// ---------------------------------------------------------------------------
// DuplicatePairRepo
// ---------------------------------------------------------------------------
// DuplicatePairRepo implements port.DuplicatePairRepo using PostgreSQL.
type DuplicatePairRepo struct {
pool *pgxpool.Pool
}
// NewDuplicatePairRepo creates a DuplicatePairRepo backed by pool.
func NewDuplicatePairRepo(pool *pgxpool.Pool) *DuplicatePairRepo {
return &DuplicatePairRepo{pool: pool}
}
var _ port.DuplicatePairRepo = (*DuplicatePairRepo)(nil)
// ReplaceAll atomically replaces the entire pairs table with the given set.
// The rescan recomputes pairs from scratch, so a full DELETE + COPY is both
// correct and the simplest way to drop pairs that no longer qualify.
func (r *DuplicatePairRepo) ReplaceAll(ctx context.Context, pairs []domain.DuplicatePair) error {
tx, err := r.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("DuplicatePairRepo.ReplaceAll begin: %w", err)
}
defer tx.Rollback(ctx) //nolint:errcheck // no-op after a successful commit
if _, err := tx.Exec(ctx, `DELETE FROM data.duplicate_pairs`); err != nil {
return fmt.Errorf("DuplicatePairRepo.ReplaceAll delete: %w", err)
}
if len(pairs) > 0 {
rows := make([][]any, len(pairs))
for i, p := range pairs {
rows[i] = []any{p.FileA, p.FileB, int16(p.Distance)}
}
if _, err := tx.CopyFrom(ctx,
pgx.Identifier{"data", "duplicate_pairs"},
[]string{"file_a", "file_b", "distance"},
pgx.CopyFromRows(rows),
); err != nil {
return fmt.Errorf("DuplicatePairRepo.ReplaceAll copy: %w", err)
}
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("DuplicatePairRepo.ReplaceAll commit: %w", err)
}
return nil
}
type pairRow struct {
FileA uuid.UUID `db:"file_a"`
FileB uuid.UUID `db:"file_b"`
Distance int16 `db:"distance"`
}
// ListVisible returns every stored pair where both files are live (not trashed),
// the pair is not dismissed, and — for non-admins — both files are visible to the
// viewer under the private-by-default model. This is the input to clustering.
func (r *DuplicatePairRepo) ListVisible(ctx context.Context, viewerID int16, isAdmin bool) ([]domain.DuplicatePair, error) {
args := make([]any, 0, 4)
n := 1
aclWhere := ""
if !isAdmin {
var ca, cb string
ca, n, args = aclVisibilityCond("fa", objTypeFile, viewerID, n, args)
cb, n, args = aclVisibilityCond("fb", objTypeFile, viewerID, n, args)
aclWhere = "AND " + ca + " AND " + cb
}
sqlStr := fmt.Sprintf(`
SELECT p.file_a, p.file_b, p.distance
FROM data.duplicate_pairs p
JOIN data.files fa ON fa.id = p.file_a AND fa.is_deleted = false
JOIN data.files fb ON fb.id = p.file_b AND fb.is_deleted = false
WHERE NOT EXISTS (
SELECT 1 FROM data.duplicate_dismissals d
WHERE d.file_a = p.file_a AND d.file_b = p.file_b
)
%s
ORDER BY p.file_a, p.file_b`, aclWhere)
rows, err := r.pool.Query(ctx, sqlStr, args...)
if err != nil {
return nil, fmt.Errorf("DuplicatePairRepo.ListVisible: %w", err)
}
collected, err := pgx.CollectRows(rows, pgx.RowToStructByName[pairRow])
if err != nil {
return nil, fmt.Errorf("DuplicatePairRepo.ListVisible scan: %w", err)
}
out := make([]domain.DuplicatePair, len(collected))
for i, row := range collected {
out[i] = domain.DuplicatePair{FileA: row.FileA, FileB: row.FileB, Distance: int(row.Distance)}
}
return out, nil
}
// ---------------------------------------------------------------------------
// DismissalRepo
// ---------------------------------------------------------------------------
// DismissalRepo implements port.DismissalRepo using PostgreSQL.
type DismissalRepo struct {
pool *pgxpool.Pool
}
// NewDismissalRepo creates a DismissalRepo backed by pool.
func NewDismissalRepo(pool *pgxpool.Pool) *DismissalRepo {
return &DismissalRepo{pool: pool}
}
var _ port.DismissalRepo = (*DismissalRepo)(nil)
// Add records a pair as "not a duplicate". The two ids are stored in canonical
// (file_a < file_b) order to match the table's CHECK and avoid (a,b)/(b,a)
// duplicates; a repeated dismissal is a no-op.
func (r *DismissalRepo) Add(ctx context.Context, a, b uuid.UUID, userID int16) error {
if bytes.Compare(a[:], b[:]) > 0 {
a, b = b, a
}
const sqlStr = `
INSERT INTO data.duplicate_dismissals (file_a, file_b, dismissed_by)
VALUES ($1, $2, $3)
ON CONFLICT (file_a, file_b) DO NOTHING`
q := connOrTx(ctx, r.pool)
if _, err := q.Exec(ctx, sqlStr, a, b, userID); err != nil {
return fmt.Errorf("DismissalRepo.Add: %w", err)
}
return nil
}
+83
View File
@@ -446,6 +446,89 @@ func (r *FileRepo) SetPHash(ctx context.Context, id uuid.UUID, phash *int64) err
return nil
}
// ---------------------------------------------------------------------------
// Perceptual-hash / duplicate support
// ---------------------------------------------------------------------------
// ListMissingPHash returns live image/video files that have no perceptual hash
// yet — the work list for the dedup backfill. Tags are not loaded (the backfill
// only needs the id and MIME type to choose image vs video hashing).
func (r *FileRepo) ListMissingPHash(ctx context.Context) ([]domain.File, error) {
const sqlStr = `
SELECT f.id, f.original_name,
mt.name AS mime_type, mt.extension AS mime_extension,
f.content_datetime, f.notes, f.metadata, f.exif, f.phash,
f.creator_id, u.name AS creator_name,
f.is_public, f.is_deleted, f.needs_review
FROM data.files f
JOIN core.mime_types mt ON mt.id = f.mime_id
JOIN core.users u ON u.id = f.creator_id
WHERE f.phash IS NULL AND f.is_deleted = false
AND (mt.name LIKE 'image/%' OR mt.name LIKE 'video/%')
ORDER BY f.id`
q := connOrTx(ctx, r.pool)
rows, err := q.Query(ctx, sqlStr)
if err != nil {
return nil, fmt.Errorf("FileRepo.ListMissingPHash: %w", err)
}
collected, err := pgx.CollectRows(rows, pgx.RowToStructByName[fileRow])
if err != nil {
return nil, fmt.Errorf("FileRepo.ListMissingPHash scan: %w", err)
}
files := make([]domain.File, len(collected))
for i, row := range collected {
files[i] = toFile(row)
}
return files, nil
}
// phashRow is the minimal projection used to build duplicate clusters.
type phashRow struct {
ID uuid.UUID `db:"id"`
PHash int64 `db:"phash"`
}
// ListAllPHashes returns the id and perceptual hash of every live, hashed file.
// It is the global input to the dedup rescan, so it deliberately ignores ACL —
// the rescan builds the shared pairs table; visibility is enforced on read.
func (r *FileRepo) ListAllPHashes(ctx context.Context) ([]domain.PHashEntry, error) {
const sqlStr = `SELECT id, phash FROM data.files WHERE is_deleted = false AND phash IS NOT NULL`
q := connOrTx(ctx, r.pool)
rows, err := q.Query(ctx, sqlStr)
if err != nil {
return nil, fmt.Errorf("FileRepo.ListAllPHashes: %w", err)
}
collected, err := pgx.CollectRows(rows, pgx.RowToStructByName[phashRow])
if err != nil {
return nil, fmt.Errorf("FileRepo.ListAllPHashes scan: %w", err)
}
out := make([]domain.PHashEntry, len(collected))
for i, row := range collected {
out[i] = domain.PHashEntry{ID: row.ID, PHash: row.PHash}
}
return out, nil
}
// CopyPoolMemberships adds targetID to every pool sourceID belongs to (copying
// the source's position), skipping pools the target is already in. Used by the
// duplicate merge to preserve the discarded file's pool memberships on the
// survivor. The merge is authorised at the file level, so pool ACL is not
// re-checked here.
func (r *FileRepo) CopyPoolMemberships(ctx context.Context, targetID, sourceID uuid.UUID) error {
const sqlStr = `
INSERT INTO data.file_pool (file_id, pool_id, position)
SELECT $1, fp.pool_id, fp.position
FROM data.file_pool fp
WHERE fp.file_id = $2
ON CONFLICT (file_id, pool_id) DO NOTHING`
q := connOrTx(ctx, r.pool)
if _, err := q.Exec(ctx, sqlStr, targetID, sourceID); err != nil {
return fmt.Errorf("FileRepo.CopyPoolMemberships: %w", err)
}
return nil
}
// ---------------------------------------------------------------------------
// SoftDelete / Restore / DeletePermanent
// ---------------------------------------------------------------------------