feat(repo): enhance MoveToArchive functionality to handle duplicates and improve error handling
This commit is contained in:
parent
c4ac2ed128
commit
a0db346ab7
@ -3,6 +3,7 @@ package repo
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
"watch-party-backend/internal/core/episode"
|
"watch-party-backend/internal/core/episode"
|
||||||
|
|
||||||
@ -140,68 +141,89 @@ func (r *pgxEpisodeRepo) MoveToArchive(ctx context.Context, ids []int64) (episod
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
requested := make(map[int64]struct{}, len(ids))
|
||||||
|
for _, id := range ids {
|
||||||
|
requested[id] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
tx, err := r.pool.Begin(ctx)
|
tx, err := r.pool.Begin(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
defer func() { _ = tx.Rollback(ctx) }()
|
defer func() { _ = tx.Rollback(ctx) }()
|
||||||
|
|
||||||
// 1) Insert into archive from current (skip duplicates by id)
|
// Grab the rows we need to archive.
|
||||||
insRows, err := tx.Query(ctx, `
|
rows, err := tx.Query(ctx, `
|
||||||
INSERT INTO current_archive (
|
|
||||||
id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
|
|
||||||
)
|
|
||||||
SELECT id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
|
SELECT id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
|
||||||
FROM current
|
FROM current
|
||||||
WHERE id = ANY($1::bigint[])
|
WHERE id = ANY($1::bigint[])
|
||||||
ON CONFLICT (id) DO NOTHING
|
|
||||||
RETURNING id
|
|
||||||
`, ids)
|
`, ids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
for insRows.Next() {
|
defer rows.Close()
|
||||||
var id int64
|
|
||||||
if err := insRows.Scan(&id); err != nil {
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
id int64
|
||||||
|
epNum int
|
||||||
|
epTitle string
|
||||||
|
seasonName string
|
||||||
|
startTime string
|
||||||
|
playback string
|
||||||
|
currentEp bool
|
||||||
|
dateCreated time.Time
|
||||||
|
archivedID int64
|
||||||
|
inserted bool
|
||||||
|
)
|
||||||
|
if err := rows.Scan(&id, &epNum, &epTitle, &seasonName, &startTime, &playback, ¤tEp, &dateCreated); err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First try to insert with the same id; on conflict, insert with a new id.
|
||||||
|
err = tx.QueryRow(ctx, `
|
||||||
|
INSERT INTO current_archive (
|
||||||
|
id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
|
||||||
|
)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
|
ON CONFLICT (id) DO NOTHING
|
||||||
|
RETURNING id
|
||||||
|
`, id, epNum, epTitle, seasonName, startTime, playback, currentEp, dateCreated).Scan(&archivedID)
|
||||||
|
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
inserted = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !inserted {
|
||||||
|
if err := tx.QueryRow(ctx, `
|
||||||
|
INSERT INTO current_archive (
|
||||||
|
id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
|
||||||
|
)
|
||||||
|
VALUES (nextval(pg_get_serial_sequence('current','id')), $1, $2, $3, $4, $5, $6, $7)
|
||||||
|
RETURNING id
|
||||||
|
`, epNum, epTitle, seasonName, startTime, playback, currentEp, dateCreated).Scan(&archivedID); err != nil {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete from current using the original id.
|
||||||
|
if _, err := tx.Exec(ctx, `DELETE FROM current WHERE id = $1`, id); err != nil {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
res.MovedIDs = append(res.MovedIDs, id)
|
res.MovedIDs = append(res.MovedIDs, id)
|
||||||
|
res.DeletedIDs = append(res.DeletedIDs, id)
|
||||||
|
delete(requested, id)
|
||||||
}
|
}
|
||||||
if err := insRows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2) Delete from current only those actually inserted to archive
|
// Any ids not found are treated as skipped.
|
||||||
if len(res.MovedIDs) > 0 {
|
for id := range requested {
|
||||||
delRows, err := tx.Query(ctx, `
|
res.SkippedIDs = append(res.SkippedIDs, id)
|
||||||
DELETE FROM current
|
|
||||||
WHERE id = ANY($1::bigint[])
|
|
||||||
RETURNING id
|
|
||||||
`, res.MovedIDs)
|
|
||||||
if err != nil {
|
|
||||||
return res, err
|
|
||||||
}
|
|
||||||
for delRows.Next() {
|
|
||||||
var id int64
|
|
||||||
if err := delRows.Scan(&id); err != nil {
|
|
||||||
return res, err
|
|
||||||
}
|
|
||||||
res.DeletedIDs = append(res.DeletedIDs, id)
|
|
||||||
}
|
|
||||||
if err := delRows.Err(); err != nil {
|
|
||||||
return res, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3) Compute skipped = requested - moved
|
|
||||||
sel := make(map[int64]struct{}, len(res.MovedIDs))
|
|
||||||
for _, id := range res.MovedIDs {
|
|
||||||
sel[id] = struct{}{}
|
|
||||||
}
|
|
||||||
for _, id := range ids {
|
|
||||||
if _, ok := sel[id]; !ok {
|
|
||||||
res.SkippedIDs = append(res.SkippedIDs, id)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(ctx); err != nil {
|
if err := tx.Commit(ctx); err != nil {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user