feat(backend): stream folder-import progress as NDJSON
The import endpoint did all the work in one request and returned only an aggregate summary, so the UI couldn't show progress or per-file status. Refactor FileService.Import to take an optional progress callback and emit a "start" event (with the total entry count), one "file" event per entry as it finishes (index, filename, status, optional reason), and a final "done" event with the tallies. The handler streams these as newline-delimited JSON and flushes after each, deferring the response headers until the first event so a validation error raised before any file is touched is still returned as a normal JSON error. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -684,13 +684,36 @@ func (h *FileHandler) Import(c *gin.Context) {
|
|||||||
// Body is optional; ignore bind errors.
|
// Body is optional; ignore bind errors.
|
||||||
_ = c.ShouldBindJSON(&body)
|
_ = c.ShouldBindJSON(&body)
|
||||||
|
|
||||||
result, err := h.fileSvc.Import(c.Request.Context(), body.Path)
|
// Stream progress as newline-delimited JSON so the client can render a live
|
||||||
if err != nil {
|
// progress bar and per-file status. Headers are deferred until the first
|
||||||
respondError(c, err)
|
// event, so a validation error (bad path, import disabled) raised before any
|
||||||
return
|
// file is touched can still be returned as a normal JSON error response.
|
||||||
|
flusher, canFlush := c.Writer.(http.Flusher)
|
||||||
|
started := false
|
||||||
|
enc := json.NewEncoder(c.Writer)
|
||||||
|
|
||||||
|
emit := func(ev service.ImportEvent) {
|
||||||
|
if !started {
|
||||||
|
c.Header("Content-Type", "application/x-ndjson")
|
||||||
|
c.Header("Cache-Control", "no-cache")
|
||||||
|
c.Header("X-Accel-Buffering", "no") // don't let a proxy buffer the stream
|
||||||
|
c.Writer.WriteHeader(http.StatusOK)
|
||||||
|
started = true
|
||||||
|
}
|
||||||
|
_ = enc.Encode(ev) // appends a newline
|
||||||
|
if canFlush {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
respondJSON(c, http.StatusOK, result)
|
if _, err := h.fileSvc.Import(c.Request.Context(), body.Path, emit); err != nil {
|
||||||
|
if !started {
|
||||||
|
respondError(c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Headers already sent; surface the failure as a terminal stream event.
|
||||||
|
emit(service.ImportEvent{Type: "error", Reason: err.Error()})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -902,6 +902,35 @@ func TestNonOwnerAccessControl(t *testing.T) {
|
|||||||
assert.Equal(t, http.StatusOK, resp.StatusCode, resp.String())
|
assert.Equal(t, http.StatusOK, resp.StatusCode, resp.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// importEvent mirrors service.ImportEvent for decoding the streamed progress.
|
||||||
|
type importEvent struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Total int `json:"total"`
|
||||||
|
Index int `json:"index"`
|
||||||
|
Filename string `json:"filename"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Reason string `json:"reason"`
|
||||||
|
Imported int `json:"imported"`
|
||||||
|
Skipped int `json:"skipped"`
|
||||||
|
Errors int `json:"errors"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseImportEvents splits an NDJSON import response into its events.
|
||||||
|
func parseImportEvents(t *testing.T, resp *testResponse) []importEvent {
|
||||||
|
t.Helper()
|
||||||
|
var events []importEvent
|
||||||
|
for _, line := range bytes.Split(resp.bodyBytes, []byte("\n")) {
|
||||||
|
line = bytes.TrimSpace(line)
|
||||||
|
if len(line) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var ev importEvent
|
||||||
|
require.NoError(t, json.Unmarshal(line, &ev), "event line: %s", line)
|
||||||
|
events = append(events, ev)
|
||||||
|
}
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
|
||||||
// TestImportFromFolder verifies the admin server-side import: supported files
|
// TestImportFromFolder verifies the admin server-side import: supported files
|
||||||
// are ingested, subdirectories are skipped, the source is removed from the
|
// are ingested, subdirectories are skipped, the source is removed from the
|
||||||
// import folder afterwards, and a file without EXIF takes the source's mtime as
|
// import folder afterwards, and a file without EXIF takes the source's mtime as
|
||||||
@@ -923,18 +952,31 @@ func TestImportFromFolder(t *testing.T) {
|
|||||||
|
|
||||||
resp := h.doJSON("POST", "/files/import", map[string]any{}, adminToken)
|
resp := h.doJSON("POST", "/files/import", map[string]any{}, adminToken)
|
||||||
require.Equal(t, http.StatusOK, resp.StatusCode, resp.String())
|
require.Equal(t, http.StatusOK, resp.StatusCode, resp.String())
|
||||||
var res struct {
|
|
||||||
Imported int `json:"imported"`
|
// The import streams newline-delimited JSON progress events.
|
||||||
Skipped int `json:"skipped"`
|
events := parseImportEvents(t, resp)
|
||||||
Errors []struct {
|
var start, done *importEvent
|
||||||
Filename string `json:"filename"`
|
files := map[string]importEvent{}
|
||||||
Reason string `json:"reason"`
|
for i := range events {
|
||||||
} `json:"errors"`
|
switch events[i].Type {
|
||||||
|
case "start":
|
||||||
|
start = &events[i]
|
||||||
|
case "done":
|
||||||
|
done = &events[i]
|
||||||
|
case "file":
|
||||||
|
files[events[i].Filename] = events[i]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
resp.decode(t, &res)
|
require.NotNil(t, start, resp.String())
|
||||||
assert.Equal(t, 1, res.Imported, resp.String())
|
require.NotNil(t, done, resp.String())
|
||||||
assert.Equal(t, 1, res.Skipped, resp.String()) // the nested directory
|
assert.Equal(t, 2, start.Total, "start total counts every entry")
|
||||||
assert.Empty(t, res.Errors, resp.String())
|
assert.Equal(t, 1, done.Imported, resp.String())
|
||||||
|
assert.Equal(t, 1, done.Skipped, resp.String()) // the nested directory
|
||||||
|
assert.Equal(t, 0, done.Errors, resp.String())
|
||||||
|
|
||||||
|
// Per-file events: the JPEG imported, the subdirectory was skipped.
|
||||||
|
assert.Equal(t, "imported", files["scan.jpg"].Status, resp.String())
|
||||||
|
assert.Equal(t, "skipped", files["nested"].Status, resp.String())
|
||||||
|
|
||||||
// Source file is gone from the import folder after a successful import.
|
// Source file is gone from the import folder after a successful import.
|
||||||
_, statErr := os.Stat(srcPath)
|
_, statErr := os.Stat(srcPath)
|
||||||
|
|||||||
@@ -70,6 +70,24 @@ type ImportResult struct {
|
|||||||
Errors []ImportFileError `json:"errors"`
|
Errors []ImportFileError `json:"errors"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ImportEvent is one progress message streamed during an import, letting the UI
|
||||||
|
// show a live progress bar and a per-file status list. Type is the discriminator:
|
||||||
|
//
|
||||||
|
// "start" — total is the number of entries about to be processed.
|
||||||
|
// "file" — one entry finished: index (1-based), filename, status, optional reason.
|
||||||
|
// "done" — final tallies (imported/skipped/errors).
|
||||||
|
type ImportEvent struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Total int `json:"total,omitempty"`
|
||||||
|
Index int `json:"index,omitempty"`
|
||||||
|
Filename string `json:"filename,omitempty"`
|
||||||
|
Status string `json:"status,omitempty"` // "imported" | "skipped" | "error"
|
||||||
|
Reason string `json:"reason,omitempty"`
|
||||||
|
Imported int `json:"imported,omitempty"`
|
||||||
|
Skipped int `json:"skipped,omitempty"`
|
||||||
|
Errors int `json:"errors,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// FileService handles business logic for file records.
|
// FileService handles business logic for file records.
|
||||||
type FileService struct {
|
type FileService struct {
|
||||||
files port.FileRepo
|
files port.FileRepo
|
||||||
@@ -532,7 +550,12 @@ func (s *FileService) BulkDelete(ctx context.Context, fileIDs []uuid.UUID) error
|
|||||||
|
|
||||||
// Import scans a server-side directory and uploads all supported files.
|
// Import scans a server-side directory and uploads all supported files.
|
||||||
// If path is empty, the configured default import path is used.
|
// If path is empty, the configured default import path is used.
|
||||||
func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, error) {
|
//
|
||||||
|
// onProgress, when non-nil, receives a "start" event, one "file" event per
|
||||||
|
// directory entry as it is processed, and a final "done" event — letting a
|
||||||
|
// caller stream live progress. It is always called from this goroutine (never
|
||||||
|
// concurrently). The aggregate result is also returned for non-streaming callers.
|
||||||
|
func (s *FileService) Import(ctx context.Context, path string, onProgress func(ImportEvent)) (*ImportResult, error) {
|
||||||
if s.importPath == "" {
|
if s.importPath == "" {
|
||||||
return nil, domain.ErrValidation
|
return nil, domain.ErrValidation
|
||||||
}
|
}
|
||||||
@@ -553,47 +576,58 @@ func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, e
|
|||||||
return nil, fmt.Errorf("FileService.Import: read dir %q: %w", dir, err)
|
return nil, fmt.Errorf("FileService.Import: read dir %q: %w", dir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
result := &ImportResult{Errors: []ImportFileError{}}
|
emit := func(ev ImportEvent) {
|
||||||
|
if onProgress != nil {
|
||||||
|
onProgress(ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &ImportResult{Errors: []ImportFileError{}}
|
||||||
|
total := len(entries)
|
||||||
|
emit(ImportEvent{Type: "start", Total: total})
|
||||||
|
|
||||||
|
for i, entry := range entries {
|
||||||
|
name := entry.Name()
|
||||||
|
file := func(status, reason string) {
|
||||||
|
emit(ImportEvent{
|
||||||
|
Type: "file", Index: i + 1, Total: total,
|
||||||
|
Filename: name, Status: status, Reason: reason,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fail := func(reason string) {
|
||||||
|
result.Errors = append(result.Errors, ImportFileError{Filename: name, Reason: reason})
|
||||||
|
file("error", reason)
|
||||||
|
}
|
||||||
|
|
||||||
for _, entry := range entries {
|
|
||||||
if entry.IsDir() {
|
if entry.IsDir() {
|
||||||
result.Skipped++
|
result.Skipped++
|
||||||
|
file("skipped", "directory")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
fullPath := filepath.Join(dir, entry.Name())
|
fullPath := filepath.Join(dir, name)
|
||||||
|
|
||||||
mt, err := mimetype.DetectFile(fullPath)
|
mt, err := mimetype.DetectFile(fullPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result.Errors = append(result.Errors, ImportFileError{
|
fail(fmt.Sprintf("MIME detection failed: %s", err))
|
||||||
Filename: entry.Name(),
|
|
||||||
Reason: fmt.Sprintf("MIME detection failed: %s", err),
|
|
||||||
})
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
mimeStr := mt.String()
|
mimeStr := mt.String()
|
||||||
// Strip parameters (e.g. "text/plain; charset=utf-8" → "text/plain").
|
// Strip parameters (e.g. "text/plain; charset=utf-8" → "text/plain").
|
||||||
if idx := len(mimeStr); idx > 0 {
|
if j := strings.IndexByte(mimeStr, ';'); j >= 0 {
|
||||||
for i, c := range mimeStr {
|
mimeStr = mimeStr[:j]
|
||||||
if c == ';' {
|
|
||||||
mimeStr = mimeStr[:i]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := s.mimes.GetByName(ctx, mimeStr); err != nil {
|
if _, err := s.mimes.GetByName(ctx, mimeStr); err != nil {
|
||||||
result.Skipped++
|
result.Skipped++
|
||||||
|
file("skipped", "unsupported type: "+mimeStr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := os.Open(fullPath)
|
f, err := os.Open(fullPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result.Errors = append(result.Errors, ImportFileError{
|
fail(fmt.Sprintf("open failed: %s", err))
|
||||||
Filename: entry.Name(),
|
|
||||||
Reason: fmt.Sprintf("open failed: %s", err),
|
|
||||||
})
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -606,7 +640,6 @@ func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, e
|
|||||||
mtime = &t
|
mtime = &t
|
||||||
}
|
}
|
||||||
|
|
||||||
name := entry.Name()
|
|
||||||
_, uploadErr := s.Upload(ctx, UploadParams{
|
_, uploadErr := s.Upload(ctx, UploadParams{
|
||||||
Reader: f,
|
Reader: f,
|
||||||
MIMEType: mimeStr,
|
MIMEType: mimeStr,
|
||||||
@@ -616,10 +649,7 @@ func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, e
|
|||||||
f.Close()
|
f.Close()
|
||||||
|
|
||||||
if uploadErr != nil {
|
if uploadErr != nil {
|
||||||
result.Errors = append(result.Errors, ImportFileError{
|
fail(uploadErr.Error())
|
||||||
Filename: entry.Name(),
|
|
||||||
Reason: uploadErr.Error(),
|
|
||||||
})
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
result.Imported++
|
result.Imported++
|
||||||
@@ -628,13 +658,19 @@ func (s *FileService) Import(ctx context.Context, path string) (*ImportResult, e
|
|||||||
// doesn't duplicate. The file is already safely copied into storage; a
|
// doesn't duplicate. The file is already safely copied into storage; a
|
||||||
// removal failure is reported but doesn't undo the import.
|
// removal failure is reported but doesn't undo the import.
|
||||||
if rmErr := os.Remove(fullPath); rmErr != nil {
|
if rmErr := os.Remove(fullPath); rmErr != nil {
|
||||||
result.Errors = append(result.Errors, ImportFileError{
|
reason := fmt.Sprintf("imported, but failed to remove source: %s", rmErr)
|
||||||
Filename: entry.Name(),
|
result.Errors = append(result.Errors, ImportFileError{Filename: name, Reason: reason})
|
||||||
Reason: fmt.Sprintf("imported, but failed to remove source: %s", rmErr),
|
file("imported", reason) // imported, with a warning
|
||||||
})
|
continue
|
||||||
}
|
}
|
||||||
|
file("imported", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
emit(ImportEvent{
|
||||||
|
Type: "done", Total: total,
|
||||||
|
Imported: result.Imported, Skipped: result.Skipped, Errors: len(result.Errors),
|
||||||
|
})
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user