302 lines
7.7 KiB
Go
302 lines
7.7 KiB
Go
package repo
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log"
|
|
"time"
|
|
|
|
"watch-party-backend/internal/core/episode"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgconn"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type pgxPool interface {
|
|
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
|
|
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
|
|
Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
|
|
Begin(ctx context.Context) (pgx.Tx, error)
|
|
}
|
|
|
|
type pgxEpisodeRepo struct {
|
|
pool pgxPool
|
|
}
|
|
|
|
func NewEpisodeRepo(pool *pgxpool.Pool) episode.Repository {
|
|
return &pgxEpisodeRepo{pool: pool}
|
|
}
|
|
|
|
func (r *pgxEpisodeRepo) ListAll(ctx context.Context) ([]episode.Episode, error) {
|
|
const q = `
|
|
SELECT
|
|
id,
|
|
ep_num,
|
|
ep_title,
|
|
season_name,
|
|
to_char(start_time, 'HH24:MI:SS') AS start_time,
|
|
to_char(playback_length, 'HH24:MI:SS') AS playback_length,
|
|
current_ep,
|
|
date_created
|
|
FROM current
|
|
ORDER BY id DESC;
|
|
`
|
|
rows, err := r.pool.Query(ctx, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []episode.Episode
|
|
for rows.Next() {
|
|
var e episode.Episode
|
|
if err := rows.Scan(&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, e)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (r *pgxEpisodeRepo) GetCurrent(ctx context.Context) (episode.Episode, error) {
|
|
const q = `
|
|
SELECT
|
|
id,
|
|
ep_num,
|
|
ep_title,
|
|
season_name,
|
|
to_char(start_time, 'HH24:MI:SS') AS start_time,
|
|
to_char(playback_length, 'HH24:MI:SS') AS playback_length,
|
|
current_ep,
|
|
date_created
|
|
FROM current
|
|
WHERE current_ep = true
|
|
ORDER BY id DESC
|
|
LIMIT 1;
|
|
`
|
|
var e episode.Episode
|
|
err := r.pool.QueryRow(ctx, q).Scan(
|
|
&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated,
|
|
)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return episode.Episode{}, episode.ErrNotFound
|
|
}
|
|
return episode.Episode{}, err
|
|
}
|
|
return e, nil
|
|
}
|
|
|
|
func (r *pgxEpisodeRepo) Create(ctx context.Context, in episode.NewShowInput) (episode.Episode, error) {
|
|
const q = `
|
|
INSERT INTO current (ep_num, ep_title, season_name, start_time, playback_length)
|
|
VALUES ($1, $2, $3, $4::time, $5::interval)
|
|
RETURNING
|
|
id,
|
|
ep_num,
|
|
ep_title,
|
|
season_name,
|
|
to_char(start_time, 'HH24:MI:SS') AS start_time,
|
|
to_char(playback_length, 'HH24:MI:SS') AS playback_length,
|
|
current_ep,
|
|
date_created;
|
|
`
|
|
var e episode.Episode
|
|
err := r.pool.QueryRow(ctx, q, in.EpNum, in.EpTitle, in.SeasonName, in.StartTime, in.PlaybackLength).Scan(
|
|
&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated,
|
|
)
|
|
if err != nil {
|
|
return episode.Episode{}, err
|
|
}
|
|
return e, nil
|
|
}
|
|
|
|
func (r *pgxEpisodeRepo) ListArchive(ctx context.Context) ([]episode.ArchiveEpisode, error) {
|
|
const q = `
|
|
SELECT
|
|
id,
|
|
ep_num,
|
|
ep_title,
|
|
season_name,
|
|
to_char(start_time, 'HH24:MI:SS') AS start_time,
|
|
to_char(playback_length, 'HH24:MI:SS') AS playback_length,
|
|
current_ep,
|
|
date_created,
|
|
date_archived
|
|
FROM current_archive
|
|
ORDER BY date_archived DESC, id DESC;
|
|
`
|
|
rows, err := r.pool.Query(ctx, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []episode.ArchiveEpisode
|
|
for rows.Next() {
|
|
var e episode.ArchiveEpisode
|
|
if err := rows.Scan(&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated, &e.DateArchived); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, e)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (r *pgxEpisodeRepo) SetCurrent(ctx context.Context, id int64, startHHMMSS string) error {
|
|
tx, err := r.pool.Begin(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
// 1) clear any current episode flag
|
|
if _, err := tx.Exec(ctx, `UPDATE current SET current_ep = false WHERE current_ep = true`); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 2) set start_time and current_ep for the id
|
|
ct, err := tx.Exec(ctx, `UPDATE current SET start_time = $1::time, current_ep = true WHERE id = $2`, startHHMMSS, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ct.RowsAffected() == 0 {
|
|
return episode.ErrNotFound
|
|
}
|
|
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
func (r *pgxEpisodeRepo) MoveToArchive(ctx context.Context, ids []int64) (episode.MoveResult, error) {
|
|
res := episode.MoveResult{}
|
|
if len(ids) == 0 {
|
|
return res, nil
|
|
}
|
|
|
|
requested := make(map[int64]struct{}, len(ids))
|
|
for _, id := range ids {
|
|
requested[id] = struct{}{}
|
|
}
|
|
|
|
tx, err := r.pool.Begin(ctx)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
// Grab the rows we need to archive.
|
|
rows, err := tx.Query(ctx, `
|
|
SELECT
|
|
id,
|
|
ep_num,
|
|
ep_title,
|
|
season_name,
|
|
to_char(start_time, 'HH24:MI:SS') AS start_time,
|
|
to_char(playback_length, 'HH24:MI:SS') AS playback_length,
|
|
current_ep,
|
|
date_created
|
|
FROM current
|
|
WHERE id = ANY($1::bigint[])
|
|
`, ids)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
type rowData struct {
|
|
id int64
|
|
epNum int
|
|
epTitle string
|
|
seasonName string
|
|
startTime string
|
|
playback string
|
|
currentEp bool
|
|
dateCreated time.Time
|
|
}
|
|
var items []rowData
|
|
for rows.Next() {
|
|
var item rowData
|
|
if err := rows.Scan(&item.id, &item.epNum, &item.epTitle, &item.seasonName, &item.startTime, &item.playback, &item.currentEp, &item.dateCreated); err != nil {
|
|
return res, err
|
|
}
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return res, err
|
|
}
|
|
|
|
for _, item := range items {
|
|
var (
|
|
archivedID int64
|
|
inserted bool
|
|
)
|
|
|
|
// First try to insert with the same id; on conflict, insert with a new id.
|
|
err = tx.QueryRow(ctx, `
|
|
INSERT INTO current_archive (
|
|
id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
|
|
)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
ON CONFLICT (id) DO NOTHING
|
|
RETURNING id
|
|
`, item.id, item.epNum, item.epTitle, item.seasonName, item.startTime, item.playback, item.currentEp, item.dateCreated).Scan(&archivedID)
|
|
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
|
log.Printf("archive insert failed for id=%d: %v", item.id, err)
|
|
return res, err
|
|
}
|
|
if err == nil {
|
|
inserted = true
|
|
}
|
|
|
|
if !inserted {
|
|
var newID int64
|
|
if err := tx.QueryRow(ctx, `SELECT COALESCE(MAX(id), 0) + 1 FROM current_archive`).Scan(&newID); err != nil {
|
|
log.Printf("archive compute new id failed for original id=%d: %v", item.id, err)
|
|
return res, err
|
|
}
|
|
if err := tx.QueryRow(ctx, `
|
|
INSERT INTO current_archive (
|
|
id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created
|
|
)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
RETURNING id
|
|
`, newID, item.epNum, item.epTitle, item.seasonName, item.startTime, item.playback, item.currentEp, item.dateCreated).Scan(&archivedID); err != nil {
|
|
log.Printf("archive fallback insert failed for original id=%d using new id=%d: %v", item.id, newID, err)
|
|
return res, err
|
|
}
|
|
}
|
|
|
|
// Delete from current using the original id.
|
|
if _, err := tx.Exec(ctx, `DELETE FROM current WHERE id = $1`, item.id); err != nil {
|
|
log.Printf("archive delete failed for id=%d: %v", item.id, err)
|
|
return res, err
|
|
}
|
|
|
|
res.MovedIDs = append(res.MovedIDs, item.id)
|
|
res.DeletedIDs = append(res.DeletedIDs, item.id)
|
|
delete(requested, item.id)
|
|
}
|
|
|
|
// Any ids not found are treated as skipped.
|
|
for id := range requested {
|
|
res.SkippedIDs = append(res.SkippedIDs, id)
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return res, err
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (r *pgxEpisodeRepo) Delete(ctx context.Context, id int64) error {
|
|
cmdTag, err := r.pool.Exec(ctx, `DELETE FROM current WHERE id = $1`, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cmdTag.RowsAffected() == 0 {
|
|
return episode.ErrNotFound
|
|
}
|
|
return nil
|
|
}
|