watch-party/backend/internal/repo/episode_repo.go

249 lines
6.3 KiB
Go

package repo
import (
"context"
"errors"
"log"
"time"
"watch-party-backend/internal/core/episode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)
type pgxPool interface {
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
Begin(ctx context.Context) (pgx.Tx, error)
}
type pgxEpisodeRepo struct {
pool pgxPool
}
func NewEpisodeRepo(pool *pgxpool.Pool) episode.Repository {
return &pgxEpisodeRepo{pool: pool}
}
func (r *pgxEpisodeRepo) ListAll(ctx context.Context) ([]episode.Episode, error) {
const q = `
SELECT
id,
ep_num,
ep_title,
season_name,
to_char(start_time, 'HH24:MI:SS') AS start_time,
to_char(playback_length, 'HH24:MI:SS') AS playback_length,
current_ep,
date_created
FROM current
ORDER BY id DESC;
`
rows, err := r.pool.Query(ctx, q)
if err != nil {
return nil, err
}
defer rows.Close()
var out []episode.Episode
for rows.Next() {
var e episode.Episode
if err := rows.Scan(&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated); err != nil {
return nil, err
}
out = append(out, e)
}
return out, rows.Err()
}
func (r *pgxEpisodeRepo) GetCurrent(ctx context.Context) (episode.Episode, error) {
const q = `
SELECT
id,
ep_num,
ep_title,
season_name,
to_char(start_time, 'HH24:MI:SS') AS start_time,
to_char(playback_length, 'HH24:MI:SS') AS playback_length,
current_ep,
date_created
FROM current
WHERE current_ep = true
ORDER BY id DESC
LIMIT 1;
`
var e episode.Episode
err := r.pool.QueryRow(ctx, q).Scan(
&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated,
)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return episode.Episode{}, episode.ErrNotFound
}
return episode.Episode{}, err
}
return e, nil
}
func (r *pgxEpisodeRepo) Create(ctx context.Context, in episode.NewShowInput) (episode.Episode, error) {
const q = `
INSERT INTO current (ep_num, ep_title, season_name, start_time, playback_length)
VALUES ($1, $2, $3, $4::time, $5::interval)
RETURNING
id,
ep_num,
ep_title,
season_name,
to_char(start_time, 'HH24:MI:SS') AS start_time,
to_char(playback_length, 'HH24:MI:SS') AS playback_length,
current_ep,
date_created;
`
var e episode.Episode
err := r.pool.QueryRow(ctx, q, in.EpNum, in.EpTitle, in.SeasonName, in.StartTime, in.PlaybackLength).Scan(
&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated,
)
if err != nil {
return episode.Episode{}, err
}
return e, nil
}
func (r *pgxEpisodeRepo) SetCurrent(ctx context.Context, id int64, startHHMMSS string) error {
tx, err := r.pool.Begin(ctx)
if err != nil {
return err
}
defer func() { _ = tx.Rollback(ctx) }()
// 1) clear any current episode flag
if _, err := tx.Exec(ctx, `UPDATE current SET current_ep = false WHERE current_ep = true`); err != nil {
return err
}
// 2) set start_time and current_ep for the id
ct, err := tx.Exec(ctx, `UPDATE current SET start_time = $1::time, current_ep = true WHERE id = $2`, startHHMMSS, id)
if err != nil {
return err
}
if ct.RowsAffected() == 0 {
return episode.ErrNotFound
}
return tx.Commit(ctx)
}
func (r *pgxEpisodeRepo) MoveToArchive(ctx context.Context, ids []int64) (episode.MoveResult, error) {
res := episode.MoveResult{}
if len(ids) == 0 {
return res, nil
}
requested := make(map[int64]struct{}, len(ids))
for _, id := range ids {
requested[id] = struct{}{}
}
tx, err := r.pool.Begin(ctx)
if err != nil {
return res, err
}
defer func() { _ = tx.Rollback(ctx) }()
// Grab the rows we need to archive.
rows, err := tx.Query(ctx, `
SELECT id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
FROM current
WHERE id = ANY($1::bigint[])
`, ids)
if err != nil {
return res, err
}
defer rows.Close()
for rows.Next() {
var (
id int64
epNum int
epTitle string
seasonName string
startTime string
playback string
currentEp bool
dateCreated time.Time
archivedID int64
inserted bool
)
if err := rows.Scan(&id, &epNum, &epTitle, &seasonName, &startTime, &playback, &currentEp, &dateCreated); err != nil {
return res, err
}
// First try to insert with the same id; on conflict, insert with a new id.
err = tx.QueryRow(ctx, `
INSERT INTO current_archive (
id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO NOTHING
RETURNING id
`, id, epNum, epTitle, seasonName, startTime, playback, currentEp, dateCreated).Scan(&archivedID)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
log.Printf("archive insert failed for id=%d: %v", id, err)
return res, err
}
if err == nil {
inserted = true
}
if !inserted {
if err := tx.QueryRow(ctx, `
INSERT INTO current_archive (
id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
)
VALUES (nextval(pg_get_serial_sequence('current','id')), $1, $2, $3, $4, $5, $6, $7)
RETURNING id
`, epNum, epTitle, seasonName, startTime, playback, currentEp, dateCreated).Scan(&archivedID); err != nil {
log.Printf("archive fallback insert failed for original id=%d: %v", id, err)
return res, err
}
}
// Delete from current using the original id.
if _, err := tx.Exec(ctx, `DELETE FROM current WHERE id = $1`, id); err != nil {
log.Printf("archive delete failed for id=%d: %v", id, err)
return res, err
}
res.MovedIDs = append(res.MovedIDs, id)
res.DeletedIDs = append(res.DeletedIDs, id)
delete(requested, id)
}
if err := rows.Err(); err != nil {
return res, err
}
// Any ids not found are treated as skipped.
for id := range requested {
res.SkippedIDs = append(res.SkippedIDs, id)
}
if err := tx.Commit(ctx); err != nil {
return res, err
}
return res, nil
}
func (r *pgxEpisodeRepo) Delete(ctx context.Context, id int64) error {
cmdTag, err := r.pool.Exec(ctx, `DELETE FROM current WHERE id = $1`, id)
if err != nil {
return err
}
if cmdTag.RowsAffected() == 0 {
return episode.ErrNotFound
}
return nil
}