package repo import ( "context" "errors" "watch-party-backend/internal/core/episode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" ) type pgxPool interface { Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) Begin(ctx context.Context) (pgx.Tx, error) } type pgxEpisodeRepo struct { pool pgxPool } func NewEpisodeRepo(pool *pgxpool.Pool) episode.Repository { return &pgxEpisodeRepo{pool: pool} } func (r *pgxEpisodeRepo) ListAll(ctx context.Context) ([]episode.Episode, error) { const q = ` SELECT id, ep_num, ep_title, season_name, to_char(start_time, 'HH24:MI:SS') AS start_time, to_char(playback_length, 'HH24:MI:SS') AS playback_length, date_created FROM current ORDER BY id DESC; ` rows, err := r.pool.Query(ctx, q) if err != nil { return nil, err } defer rows.Close() var out []episode.Episode for rows.Next() { var e episode.Episode if err := rows.Scan(&e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.DateCreated); err != nil { return nil, err } out = append(out, e) } return out, rows.Err() } func (r *pgxEpisodeRepo) GetCurrent(ctx context.Context) (episode.Episode, error) { const q = ` SELECT id, ep_num, ep_title, season_name, to_char(start_time, 'HH24:MI:SS') AS start_time, to_char(playback_length, 'HH24:MI:SS') AS playback_length, date_created FROM current WHERE current_ep = true ORDER BY id DESC LIMIT 1; ` var e episode.Episode err := r.pool.QueryRow(ctx, q).Scan( &e.Id, &e.EpNum, &e.EpTitle, &e.SeasonName, &e.StartTime, &e.PlaybackLength, &e.DateCreated, ) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return episode.Episode{}, episode.ErrNotFound } return episode.Episode{}, err } return e, nil } func (r *pgxEpisodeRepo) SetCurrent(ctx context.Context, id int64, startHHMMSS string) error { tx, err := r.pool.Begin(ctx) if err != nil { return err } defer func() { _ = tx.Rollback(ctx) }() // 1) clear any current episode flag if _, err := tx.Exec(ctx, `UPDATE current SET current_ep = false WHERE current_ep = true`); err != nil { return err } // 2) set start_time and current_ep for the id ct, err := tx.Exec(ctx, `UPDATE current SET start_time = $1::time, current_ep = true WHERE id = $2`, startHHMMSS, id) if err != nil { return err } if ct.RowsAffected() == 0 { return episode.ErrNotFound } return tx.Commit(ctx) } func (r *pgxEpisodeRepo) MoveToArchive(ctx context.Context, ids []int64) (episode.MoveResult, error) { res := episode.MoveResult{} if len(ids) == 0 { return res, nil } tx, err := r.pool.Begin(ctx) if err != nil { return res, err } defer func() { _ = tx.Rollback(ctx) }() // 1) Insert into archive from current (skip duplicates by id) insRows, err := tx.Query(ctx, ` INSERT INTO current_archive ( id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created ) SELECT id, ep_num, ep_title, season_name, start_time, playback_length, current_ep, date_created FROM current WHERE id = ANY($1::bigint[]) ON CONFLICT (id) DO NOTHING RETURNING id `, ids) if err != nil { return res, err } for insRows.Next() { var id int64 if err := insRows.Scan(&id); err != nil { return res, err } res.MovedIDs = append(res.MovedIDs, id) } if err := insRows.Err(); err != nil { return res, err } // 2) Delete from current only those actually inserted to archive if len(res.MovedIDs) > 0 { delRows, err := tx.Query(ctx, ` DELETE FROM current WHERE id = ANY($1::bigint[]) RETURNING id `, res.MovedIDs) if err != nil { return res, err } for delRows.Next() { var id int64 if err := delRows.Scan(&id); err != nil { return res, err } res.DeletedIDs = append(res.DeletedIDs, id) } if err := delRows.Err(); err != nil { return res, err } } // 3) Compute skipped = requested - moved sel := make(map[int64]struct{}, len(res.MovedIDs)) for _, id := range res.MovedIDs { sel[id] = struct{}{} } for _, id := range ids { if _, ok := sel[id]; !ok { res.SkippedIDs = append(res.SkippedIDs, id) } } if err := tx.Commit(ctx); err != nil { return res, err } return res, nil } func (r *pgxEpisodeRepo) Delete(ctx context.Context, id int64) error { cmdTag, err := r.pool.Exec(ctx, `DELETE FROM current WHERE id = $1`, id) if err != nil { return err } if cmdTag.RowsAffected() == 0 { return episode.ErrNotFound } return nil }