package repo import ( "context" "errors" "log" "time" "watch-party-backend/internal/core/episode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" ) type pgxPool interface { Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) Begin(ctx context.Context) (pgx.Tx, error) } type pgxEpisodeRepo struct { pool pgxPool } func NewEpisodeRepo(pool *pgxpool.Pool) episode.Repository { return &pgxEpisodeRepo{pool: pool} } func (r *pgxEpisodeRepo) ListAll(ctx context.Context) ([]episode.Episode, error) { const q = ` SELECT id, ep_num, ep_title, season_name, to_char(start_time, 'HH24:MI:SS') AS start_time, to_char(playback_length, 'HH24:MI:SS') AS playback_length, current_ep, date_created FROM current ORDER BY id DESC; ` rows, err := r.pool.Query(ctx, q) if err != nil { return nil, err } defer rows.Close() var out []episode.Episode for rows.Next() { var e episode.Episode if err := rows.Scan(&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated); err != nil { return nil, err } out = append(out, e) } return out, rows.Err() } func (r *pgxEpisodeRepo) GetCurrent(ctx context.Context) (episode.Episode, error) { const q = ` SELECT id, ep_num, ep_title, season_name, to_char(start_time, 'HH24:MI:SS') AS start_time, to_char(playback_length, 'HH24:MI:SS') AS playback_length, current_ep, date_created FROM current WHERE current_ep = true ORDER BY id DESC LIMIT 1; ` var e episode.Episode err := r.pool.QueryRow(ctx, q).Scan( &e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated, ) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return episode.Episode{}, episode.ErrNotFound } return episode.Episode{}, err } return e, nil } func (r *pgxEpisodeRepo) Create(ctx context.Context, in episode.NewShowInput) (episode.Episode, error) { const q = ` INSERT INTO current (ep_num, ep_title, season_name, start_time, playback_length) VALUES ($1, $2, $3, $4::time, $5::interval) RETURNING id, ep_num, ep_title, season_name, to_char(start_time, 'HH24:MI:SS') AS start_time, to_char(playback_length, 'HH24:MI:SS') AS playback_length, current_ep, date_created; ` var e episode.Episode err := r.pool.QueryRow(ctx, q, in.EpNum, in.EpTitle, in.SeasonName, in.StartTime, in.PlaybackLength).Scan( &e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated, ) if err != nil { return episode.Episode{}, err } return e, nil } func (r *pgxEpisodeRepo) ListArchive(ctx context.Context) ([]episode.ArchiveEpisode, error) { const q = ` SELECT id, ep_num, ep_title, season_name, to_char(start_time, 'HH24:MI:SS') AS start_time, to_char(playback_length, 'HH24:MI:SS') AS playback_length, current_ep, date_created, date_archived FROM current_archive ORDER BY date_archived DESC, id DESC; ` rows, err := r.pool.Query(ctx, q) if err != nil { return nil, err } defer rows.Close() var out []episode.ArchiveEpisode for rows.Next() { var e episode.ArchiveEpisode if err := rows.Scan(&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.CurrentEp, &e.DateCreated, &e.DateArchived); err != nil { return nil, err } out = append(out, e) } return out, rows.Err() } func (r *pgxEpisodeRepo) SetCurrent(ctx context.Context, id int64, startHHMMSS string) error { tx, err := r.pool.Begin(ctx) if err != nil { return err } defer func() { _ = tx.Rollback(ctx) }() // 1) clear any current episode flag if _, err := tx.Exec(ctx, `UPDATE current SET current_ep = false WHERE current_ep = true`); err != nil { return err } // 2) set start_time and current_ep for the id ct, err := tx.Exec(ctx, `UPDATE current SET start_time = $1::time, current_ep = true WHERE id = $2`, startHHMMSS, id) if err != nil { return err } if ct.RowsAffected() == 0 { return episode.ErrNotFound } return tx.Commit(ctx) } func (r *pgxEpisodeRepo) MoveToArchive(ctx context.Context, ids []int64) (episode.MoveResult, error) { res := episode.MoveResult{} if len(ids) == 0 { return res, nil } requested := make(map[int64]struct{}, len(ids)) for _, id := range ids { requested[id] = struct{}{} } tx, err := r.pool.Begin(ctx) if err != nil { return res, err } defer func() { _ = tx.Rollback(ctx) }() // Grab the rows we need to archive. rows, err := tx.Query(ctx, ` SELECT id, ep_num, ep_title, season_name, to_char(start_time, 'HH24:MI:SS') AS start_time, to_char(playback_length, 'HH24:MI:SS') AS playback_length, current_ep, date_created FROM current WHERE id = ANY($1::bigint[]) `, ids) if err != nil { return res, err } defer rows.Close() type rowData struct { id int64 epNum int epTitle string seasonName string startTime string playback string currentEp bool dateCreated time.Time } var items []rowData for rows.Next() { var item rowData if err := rows.Scan(&item.id, &item.epNum, &item.epTitle, &item.seasonName, &item.startTime, &item.playback, &item.currentEp, &item.dateCreated); err != nil { return res, err } items = append(items, item) } if err := rows.Err(); err != nil { return res, err } for _, item := range items { var ( archivedID int64 inserted bool ) // First try to insert with the same id; on conflict, insert with a new id. err = tx.QueryRow(ctx, ` INSERT INTO current_archive ( id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO NOTHING RETURNING id `, item.id, item.epNum, item.epTitle, item.seasonName, item.startTime, item.playback, item.currentEp, item.dateCreated).Scan(&archivedID) if err != nil && !errors.Is(err, pgx.ErrNoRows) { log.Printf("archive insert failed for id=%d: %v", item.id, err) return res, err } if err == nil { inserted = true } if !inserted { var newID int64 if err := tx.QueryRow(ctx, `SELECT COALESCE(MAX(id), 0) + 1 FROM current_archive`).Scan(&newID); err != nil { log.Printf("archive compute new id failed for original id=%d: %v", item.id, err) return res, err } if err := tx.QueryRow(ctx, ` INSERT INTO current_archive ( id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id `, newID, item.epNum, item.epTitle, item.seasonName, item.startTime, item.playback, item.currentEp, item.dateCreated).Scan(&archivedID); err != nil { log.Printf("archive fallback insert failed for original id=%d using new id=%d: %v", item.id, newID, err) return res, err } } // Delete from current using the original id. if _, err := tx.Exec(ctx, `DELETE FROM current WHERE id = $1`, item.id); err != nil { log.Printf("archive delete failed for id=%d: %v", item.id, err) return res, err } res.MovedIDs = append(res.MovedIDs, item.id) res.DeletedIDs = append(res.DeletedIDs, item.id) delete(requested, item.id) } // Any ids not found are treated as skipped. for id := range requested { res.SkippedIDs = append(res.SkippedIDs, id) } if err := tx.Commit(ctx); err != nil { return res, err } return res, nil } func (r *pgxEpisodeRepo) Delete(ctx context.Context, id int64) error { cmdTag, err := r.pool.Exec(ctx, `DELETE FROM current_archive WHERE id = $1`, id) if err != nil { return err } if cmdTag.RowsAffected() == 0 { return episode.ErrNotFound } return nil }