From 129cc597933dc96e21449542cf6ad8a6a5b90215 Mon Sep 17 00:00:00 2001 From: Masahiko AMANO Date: Thu, 11 Jun 2026 21:15:40 +0300 Subject: [PATCH] feat(backend): stream folder-import progress as NDJSON The import endpoint did all the work in one request and returned only an aggregate summary, so the UI couldn't show progress or per-file status. Refactor FileService.Import to take an optional progress callback and emit a "start" event (with the total entry count), one "file" event per entry as it finishes (index, filename, status, optional reason), and a final "done" event with the tallies. The handler streams these as newline-delimited JSON and flushes after each, deferring the response headers until the first event so a validation error raised before any file is touched is still returned as a normal JSON error. Co-Authored-By: Claude Opus 4.8 --- backend/internal/handler/file_handler.go | 33 ++++++-- backend/internal/integration/server_test.go | 64 +++++++++++--- backend/internal/service/file_service.go | 92 ++++++++++++++------- 3 files changed, 145 insertions(+), 44 deletions(-) diff --git a/backend/internal/handler/file_handler.go b/backend/internal/handler/file_handler.go index f092e2c..cda0979 100644 --- a/backend/internal/handler/file_handler.go +++ b/backend/internal/handler/file_handler.go @@ -684,13 +684,36 @@ func (h *FileHandler) Import(c *gin.Context) { // Body is optional; ignore bind errors. _ = c.ShouldBindJSON(&body) - result, err := h.fileSvc.Import(c.Request.Context(), body.Path) - if err != nil { - respondError(c, err) - return + // Stream progress as newline-delimited JSON so the client can render a live + // progress bar and per-file status. Headers are deferred until the first + // event, so a validation error (bad path, import disabled) raised before any + // file is touched can still be returned as a normal JSON error response. + flusher, canFlush := c.Writer.(http.Flusher) + started := false + enc := json.NewEncoder(c.Writer) + + emit := func(ev service.ImportEvent) { + if !started { + c.Header("Content-Type", "application/x-ndjson") + c.Header("Cache-Control", "no-cache") + c.Header("X-Accel-Buffering", "no") // don't let a proxy buffer the stream + c.Writer.WriteHeader(http.StatusOK) + started = true + } + _ = enc.Encode(ev) // appends a newline + if canFlush { + flusher.Flush() + } } - respondJSON(c, http.StatusOK, result) + if _, err := h.fileSvc.Import(c.Request.Context(), body.Path, emit); err != nil { + if !started { + respondError(c, err) + return + } + // Headers already sent; surface the failure as a terminal stream event. + emit(service.ImportEvent{Type: "error", Reason: err.Error()}) + } } // --------------------------------------------------------------------------- diff --git a/backend/internal/integration/server_test.go b/backend/internal/integration/server_test.go index ba35c17..77a3812 100644 --- a/backend/internal/integration/server_test.go +++ b/backend/internal/integration/server_test.go @@ -902,6 +902,35 @@ func TestNonOwnerAccessControl(t *testing.T) { assert.Equal(t, http.StatusOK, resp.StatusCode, resp.String()) } +// importEvent mirrors service.ImportEvent for decoding the streamed progress. +type importEvent struct { + Type string `json:"type"` + Total int `json:"total"` + Index int `json:"index"` + Filename string `json:"filename"` + Status string `json:"status"` + Reason string `json:"reason"` + Imported int `json:"imported"` + Skipped int `json:"skipped"` + Errors int `json:"errors"` +} + +// parseImportEvents splits an NDJSON import response into its events. +func parseImportEvents(t *testing.T, resp *testResponse) []importEvent { + t.Helper() + var events []importEvent + for _, line := range bytes.Split(resp.bodyBytes, []byte("\n")) { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + var ev importEvent + require.NoError(t, json.Unmarshal(line, &ev), "event line: %s", line) + events = append(events, ev) + } + return events +} + // TestImportFromFolder verifies the admin server-side import: supported files // are ingested, subdirectories are skipped, the source is removed from the // import folder afterwards, and a file without EXIF takes the source's mtime as @@ -923,18 +952,31 @@ func TestImportFromFolder(t *testing.T) { resp := h.doJSON("POST", "/files/import", map[string]any{}, adminToken) require.Equal(t, http.StatusOK, resp.StatusCode, resp.String()) - var res struct { - Imported int `json:"imported"` - Skipped int `json:"skipped"` - Errors []struct { - Filename string `json:"filename"` - Reason string `json:"reason"` - } `json:"errors"` + + // The import streams newline-delimited JSON progress events. + events := parseImportEvents(t, resp) + var start, done *importEvent + files := map[string]importEvent{} + for i := range events { + switch events[i].Type { + case "start": + start = &events[i] + case "done": + done = &events[i] + case "file": + files[events[i].Filename] = events[i] + } } - resp.decode(t, &res) - assert.Equal(t, 1, res.Imported, resp.String()) - assert.Equal(t, 1, res.Skipped, resp.String()) // the nested directory - assert.Empty(t, res.Errors, resp.String()) + require.NotNil(t, start, resp.String()) + require.NotNil(t, done, resp.String()) + assert.Equal(t, 2, start.Total, "start total counts every entry") + assert.Equal(t, 1, done.Imported, resp.String()) + assert.Equal(t, 1, done.Skipped, resp.String()) // the nested directory + assert.Equal(t, 0, done.Errors, resp.String()) + + // Per-file events: the JPEG imported, the subdirectory was skipped. + assert.Equal(t, "imported", files["scan.jpg"].Status, resp.String()) + assert.Equal(t, "skipped", files["nested"].Status, resp.String()) // Source file is gone from the import folder after a successful import. _, statErr := os.Stat(srcPath) diff --git a/backend/internal/service/file_service.go b/backend/internal/service/file_service.go index dcf2453..b78d708 100644 --- a/backend/internal/service/file_service.go +++ b/backend/internal/service/file_service.go @@ -70,6 +70,24 @@ type ImportResult struct { Errors []ImportFileError `json:"errors"` } +// ImportEvent is one progress message streamed during an import, letting the UI +// show a live progress bar and a per-file status list. Type is the discriminator: +// +// "start" — total is the number of entries about to be processed. +// "file" — one entry finished: index (1-based), filename, status, optional reason. +// "done" — final tallies (imported/skipped/errors). +type ImportEvent struct { + Type string `json:"type"` + Total int `json:"total,omitempty"` + Index int `json:"index,omitempty"` + Filename string `json:"filename,omitempty"` + Status string `json:"status,omitempty"` // "imported" | "skipped" | "error" + Reason string `json:"reason,omitempty"` + Imported int `json:"imported,omitempty"` + Skipped int `json:"skipped,omitempty"` + Errors int `json:"errors,omitempty"` +} + // FileService handles business logic for file records. type FileService struct { files port.FileRepo @@ -532,7 +550,12 @@ func (s *FileService) BulkDelete(ctx context.Context, fileIDs []uuid.UUID) error // Import scans a server-side directory and uploads all supported files. // If path is empty, the configured default import path is used. -func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, error) { +// +// onProgress, when non-nil, receives a "start" event, one "file" event per +// directory entry as it is processed, and a final "done" event — letting a +// caller stream live progress. It is always called from this goroutine (never +// concurrently). The aggregate result is also returned for non-streaming callers. +func (s *FileService) Import(ctx context.Context, path string, onProgress func(ImportEvent)) (*ImportResult, error) { if s.importPath == "" { return nil, domain.ErrValidation } @@ -553,47 +576,58 @@ func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, e return nil, fmt.Errorf("FileService.Import: read dir %q: %w", dir, err) } - result := &ImportResult{Errors: []ImportFileError{}} + emit := func(ev ImportEvent) { + if onProgress != nil { + onProgress(ev) + } + } + + result := &ImportResult{Errors: []ImportFileError{}} + total := len(entries) + emit(ImportEvent{Type: "start", Total: total}) + + for i, entry := range entries { + name := entry.Name() + file := func(status, reason string) { + emit(ImportEvent{ + Type: "file", Index: i + 1, Total: total, + Filename: name, Status: status, Reason: reason, + }) + } + fail := func(reason string) { + result.Errors = append(result.Errors, ImportFileError{Filename: name, Reason: reason}) + file("error", reason) + } - for _, entry := range entries { if entry.IsDir() { result.Skipped++ + file("skipped", "directory") continue } - fullPath := filepath.Join(dir, entry.Name()) + fullPath := filepath.Join(dir, name) mt, err := mimetype.DetectFile(fullPath) if err != nil { - result.Errors = append(result.Errors, ImportFileError{ - Filename: entry.Name(), - Reason: fmt.Sprintf("MIME detection failed: %s", err), - }) + fail(fmt.Sprintf("MIME detection failed: %s", err)) continue } mimeStr := mt.String() // Strip parameters (e.g. "text/plain; charset=utf-8" → "text/plain"). - if idx := len(mimeStr); idx > 0 { - for i, c := range mimeStr { - if c == ';' { - mimeStr = mimeStr[:i] - break - } - } + if j := strings.IndexByte(mimeStr, ';'); j >= 0 { + mimeStr = mimeStr[:j] } if _, err := s.mimes.GetByName(ctx, mimeStr); err != nil { result.Skipped++ + file("skipped", "unsupported type: "+mimeStr) continue } f, err := os.Open(fullPath) if err != nil { - result.Errors = append(result.Errors, ImportFileError{ - Filename: entry.Name(), - Reason: fmt.Sprintf("open failed: %s", err), - }) + fail(fmt.Sprintf("open failed: %s", err)) continue } @@ -606,7 +640,6 @@ func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, e mtime = &t } - name := entry.Name() _, uploadErr := s.Upload(ctx, UploadParams{ Reader: f, MIMEType: mimeStr, @@ -616,10 +649,7 @@ func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, e f.Close() if uploadErr != nil { - result.Errors = append(result.Errors, ImportFileError{ - Filename: entry.Name(), - Reason: uploadErr.Error(), - }) + fail(uploadErr.Error()) continue } result.Imported++ @@ -628,13 +658,19 @@ func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, e // doesn't duplicate. The file is already safely copied into storage; a // removal failure is reported but doesn't undo the import. if rmErr := os.Remove(fullPath); rmErr != nil { - result.Errors = append(result.Errors, ImportFileError{ - Filename: entry.Name(), - Reason: fmt.Sprintf("imported, but failed to remove source: %s", rmErr), - }) + reason := fmt.Sprintf("imported, but failed to remove source: %s", rmErr) + result.Errors = append(result.Errors, ImportFileError{Filename: name, Reason: reason}) + file("imported", reason) // imported, with a warning + continue } + file("imported", "") } + emit(ImportEvent{ + Type: "done", Total: total, + Imported: result.Imported, Skipped: result.Skipped, Errors: len(result.Errors), + }) + return result, nil }