asobi-downloader/asobi-downloader.py
2026-01-31 00:40:07 +09:00

435 lines
14 KiB
Python

# Interactive Asobistage downloader (m3u8 based)
# v3.1.0
from __future__ import annotations
import logging
import re
import shutil
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from time import sleep
from urllib.parse import urljoin
from urllib.request import Request, urlopen
import subprocess
import shlex
from urllib.parse import urlsplit, urlunsplit
DEFAULT_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) "
"Gecko/20100101 Firefox/134.0"
)
# ------------------------------ prompts ------------------------------------- #
def prompt_required(prompt: str) -> str:
while True:
value = input(prompt).strip()
if value:
return value
print("Please provide a value.")
def prompt_with_default(prompt: str, default: str) -> str:
value = input(f"{prompt} [{default}]: ").strip()
return value or default
def prompt_headers(existing: dict[str, str]) -> dict[str, str]:
print("\nAdd/override HTTP headers (blank line to finish).")
print("Example: Cookie: session=abc123; Referer: https://example.com\n")
headers: dict[str, str] = {}
while True:
line = input("> ").strip()
if not line:
break
if ":" not in line:
print("Header must look like 'Name: value'.")
continue
name, value = line.split(":", 1)
headers[name.strip()] = value.strip()
# merge, override existing
merged = {**existing, **headers}
if "User-Agent" not in merged:
merged["User-Agent"] = DEFAULT_UA
return merged
def prompt_curl() -> str:
print("Paste the full curl command (multi-line OK). End with an empty line:")
lines: list[str] = []
while True:
line = input()
if line.strip() == "" and lines:
break
lines.append(line)
return "\n".join(lines)
# ------------------------------ curl parsing -------------------------------- #
def parse_curl_command(curl_text: str) -> tuple[str, dict[str, str]]:
# Join backslash-continued lines
one_line = re.sub(r"\\\s*\n", " ", curl_text.strip())
try:
tokens = shlex.split(one_line)
except ValueError:
# fallback: naive split
tokens = one_line.split()
url = ""
headers: dict[str, str] = {}
for idx, tok in enumerate(tokens):
if tok.lower() == "curl" and idx + 1 < len(tokens):
# URL might be immediately after curl or later; keep scanning
continue
if tok.startswith("http://") or tok.startswith("https://"):
url = tok
if url.endswith("'") or url.endswith('"'):
url = url.strip("'\"")
if url.startswith("'") or url.startswith('"'):
url = url.strip("'\"")
continue
if tok in ("-H", "--header") and idx + 1 < len(tokens):
header_line = tokens[idx + 1]
if ":" in header_line:
name, value = header_line.split(":", 1)
headers[name.strip()] = value.strip()
if not url:
raise ValueError("Could not find URL in curl command.")
if "User-Agent" not in headers:
headers["User-Agent"] = DEFAULT_UA
return url, headers
def derive_playlist_url(segment_url: str) -> str:
# Replace final _00007.ts style suffix with .m3u8; fallback to simple .ts -> .m3u8
parts = urlsplit(segment_url)
path = parts.path
new_path = re.sub(r"_[0-9]+\.(ts|m4s|mp4)$", ".m3u8", path)
if new_path == path:
new_path = re.sub(r"\.(ts|m4s|mp4)$", ".m3u8", path)
derived = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment))
return derived
# ------------------------------ networking ---------------------------------- #
def fetch_text(url: str, headers: dict[str, str], timeout: int = 15) -> str:
req = Request(url, headers=headers)
with urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace")
def fetch_text_with_retries(
url: str,
headers: dict[str, str],
retries: int,
delay_sec: float = 1.5,
timeout: int = 15,
) -> str:
last_err: Exception | None = None
for attempt in range(1, retries + 1):
try:
return fetch_text(url, headers=headers, timeout=timeout)
except Exception as exc: # noqa: BLE001
last_err = exc
logging.warning("Attempt %s failed for playlist/key fetch: %s", attempt, exc)
if attempt < retries:
sleep(delay_sec)
raise last_err or RuntimeError("Failed to fetch text resource")
def download_binary(
url: str,
dest: Path,
headers: dict[str, str],
retries: int,
timeout: int = 20,
) -> bool:
dest.parent.mkdir(parents=True, exist_ok=True)
for attempt in range(1, retries + 1):
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=timeout) as resp, open(dest, "wb") as fh:
shutil.copyfileobj(resp, fh)
if dest.stat().st_size == 0:
raise ValueError("empty file")
return True
except Exception as exc: # noqa: BLE001
logging.warning("Attempt %s failed for %s: %s", attempt, url, exc)
if attempt < retries:
sleep(1.5 * attempt)
return False
# ------------------------------ playlist logic ------------------------------ #
def build_local_playlist(
playlist_text: str, playlist_url: str
) -> tuple[list[str], list[tuple[str, str]], list[tuple[str, str]]]:
"""
Returns:
local_lines: playlist lines for local file
jobs: list of (remote_url, local_relpath) segment downloads
key_jobs: list of (remote_key_url, local_key_filename)
"""
local_lines: list[str] = []
jobs: list[tuple[str, str]] = []
key_jobs: list[tuple[str, str]] = []
key_map: dict[str, str] = {}
seg_idx = 0
for raw in playlist_text.splitlines():
line = raw.strip()
if not line:
continue
if line.startswith("#EXT-X-KEY"):
match = re.search(r'URI="([^"]+)"', line)
if match:
key_uri = match.group(1)
key_url = urljoin(playlist_url, key_uri)
if key_url not in key_map:
local_key_name = f"key_{len(key_map)+1}.key"
key_map[key_url] = local_key_name
key_jobs.append((key_url, local_key_name))
line = re.sub(
r'URI="([^"]+)"', f'URI="{key_map[key_url]}"', line
)
local_lines.append(line)
continue
if line.startswith("#"):
local_lines.append(line)
continue
# segment line
seg_idx += 1
remote = urljoin(playlist_url, line)
local_name = f"segments/{seg_idx:05d}.ts"
jobs.append((remote, local_name))
local_lines.append(local_name)
return local_lines, jobs, key_jobs
# ------------------------------ ffmpeg -------------------------------------- #
def run_ffmpeg(local_playlist: Path, output_path: Path) -> int:
if output_path.exists():
output_path.unlink()
cmd = [
"ffmpeg",
"-loglevel",
"warning",
"-stats",
"-allowed_extensions",
"ALL",
"-i",
str(local_playlist),
"-c",
"copy",
str(output_path),
]
try:
return subprocess.run(cmd, check=False).returncode
except FileNotFoundError:
print("ffmpeg not found on PATH. Please install ffmpeg.")
return 1
def run_ffmpeg_with_retries(
local_playlist: Path, output_path: Path, retries: int, delay_sec: float
) -> int:
last_rc = 1
for attempt in range(1, retries + 1):
last_rc = run_ffmpeg(local_playlist, output_path)
if last_rc == 0 and output_path.exists():
return 0
logging.error("ffmpeg attempt %s failed with code %s", attempt, last_rc)
if attempt < retries:
print(f"ffmpeg failed (code {last_rc}). Retrying in {delay_sec}s...")
sleep(delay_sec)
return last_rc
def validate_and_retry_segments(
workdir: Path,
jobs: list[tuple[str, str]],
headers: dict[str, str],
retries: int,
) -> tuple[bool, list[str]]:
"""Ensure all segments exist and are non-empty; retry missing/bad ones."""
missing_or_bad: list[tuple[str, str]] = []
for remote, rel in jobs:
path = workdir / rel
if not path.exists() or path.stat().st_size == 0:
missing_or_bad.append((remote, rel))
if not missing_or_bad:
return True, []
logging.warning(
"Found %s missing/corrupt segments; retrying download", len(missing_or_bad)
)
for remote, rel in missing_or_bad:
download_binary(remote, workdir / rel, headers=headers, retries=retries)
# Re-check
still_bad = []
for _, rel in missing_or_bad:
path = workdir / rel
if not path.exists() or path.stat().st_size == 0:
still_bad.append(rel)
return len(still_bad) == 0, still_bad
# ------------------------------ main flow ----------------------------------- #
def main() -> int:
print("================================================================")
print("\33[36mAsobistage Downloader v3.1.0 (interactive)\33[0m\n")
curl_text = prompt_curl()
try:
segment_url, headers = parse_curl_command(curl_text)
except Exception as exc: # noqa: BLE001
print(f"[ERROR] {exc}")
return 1
derived_playlist = derive_playlist_url(segment_url)
m3u8_url = prompt_with_default("Playlist URL derived from curl", derived_playlist)
out_name = prompt_with_default(
"Output filename (without path)", "output.mkv"
)
threads = int(prompt_with_default("Concurrent downloads", "16"))
retries = int(prompt_with_default("Retries per file", "3"))
meta_retries = int(prompt_with_default("Playlist/key retries", "3"))
ffmpeg_retries = int(prompt_with_default("FFmpeg retries on failure", "2"))
ffmpeg_retry_delay = float(
prompt_with_default("Delay between ffmpeg retries (seconds)", "2")
)
keep_segments = (
prompt_with_default("Keep .ts segments after muxing? (y/N)", "N")
.lower()
.startswith("y")
)
default_workdir = Path("downloads") / Path(out_name).stem
workdir = Path(prompt_with_default("Working directory", str(default_workdir))).expanduser()
workdir.mkdir(parents=True, exist_ok=True)
log_file = workdir / "DL.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.FileHandler(log_file, mode="w")],
)
logging.info("Downloader started")
print("\nParsed headers:")
for k, v in headers.items():
print(f" {k}: {v}")
add_more = prompt_with_default("Add/override headers? (y/N)", "N").lower().startswith("y")
if add_more:
headers = prompt_headers(headers)
logging.info("Headers set: %s", {k: v for k, v in headers.items()})
print("\nFetching playlist...")
try:
playlist_text = fetch_text_with_retries(
m3u8_url, headers=headers, retries=meta_retries
)
except Exception as exc: # noqa: BLE001
print(f"[ERROR] Failed to fetch playlist: {exc}")
logging.exception("Failed to fetch playlist")
return 1
local_lines, jobs, key_jobs = build_local_playlist(playlist_text, m3u8_url)
if not jobs:
print("[ERROR] No media segments found in playlist.")
logging.error("No segments in playlist.")
return 1
local_playlist = workdir / "playlist.m3u8"
local_playlist.write_text("\n".join(local_lines) + "\n", encoding="utf-8")
# Download all keys
if key_jobs:
print(f"Downloading {len(key_jobs)} key file(s)...", end="")
for key_url, local_name in key_jobs:
key_path = workdir / local_name
key_ok = download_binary(
key_url, key_path, headers=headers, retries=meta_retries
)
if not key_ok or key_path.stat().st_size == 0:
print(" failed")
logging.error("Failed to fetch decryption key from %s", key_url)
return 1
print(" done.")
logging.info("Decryption keys downloaded.")
else:
logging.info("No AES keys declared in playlist.")
total = len(jobs)
print(f"Downloading {total} segments with {threads} threads...")
successes = 0
failures: list[str] = []
def _task(remote: str, local_rel: str) -> tuple[str, bool]:
dest = workdir / local_rel
ok = download_binary(remote, dest, headers=headers, retries=retries)
return local_rel, ok
with ThreadPoolExecutor(max_workers=threads) as pool:
future_map = {pool.submit(_task, u, p): p for u, p in jobs}
for idx, future in enumerate(as_completed(future_map), start=1):
local_rel, ok = future.result()
if ok:
successes += 1
else:
failures.append(local_rel)
print(
f"\rProgress: {successes}/{total} ok, {len(failures)} failed",
end="",
)
print()
if failures:
print(f"[ERROR] {len(failures)} segments failed: first few {failures[:5]}")
logging.error("Failed segments: %s", failures)
return 1
# Validate all segments before ffmpeg
ok, still_bad = validate_and_retry_segments(
workdir, jobs, headers=headers, retries=retries
)
if not ok:
print(
f"[ERROR] Still missing/corrupt segments after retry: {still_bad[:5]}"
)
logging.error("Unrecoverable segments: %s", still_bad)
return 1
output_path = workdir / out_name
print("\nRunning ffmpeg mux/decrypt...")
ffmpeg_rc = run_ffmpeg_with_retries(
local_playlist, output_path, retries=ffmpeg_retries, delay_sec=ffmpeg_retry_delay
)
if ffmpeg_rc != 0 or not output_path.exists():
print("[ERROR] ffmpeg failed. See DL.log for details.")
logging.error("ffmpeg exited with code %s", ffmpeg_rc)
return 1
print(f"\nSuccess! Output at: {output_path}")
logging.info("Output file created at %s", output_path)
if not keep_segments:
shutil.rmtree(workdir / "segments", ignore_errors=True)
logging.info("Temporary segments removed.")
print("================================================================")
return 0
if __name__ == "__main__":
sys.exit(main())